The introduction
In RAFT algorithm, the Leader and Follower communicate through RPC, and the message is defined as follows
type Message struct {
Type MessageType `protobuf:"varint,1,opt,name=type,enum=raftpb.MessageType" json:"type"`
To uint64 `protobuf:"varint,2,opt,name=to" json:"to"`
From uint64 `protobuf:"varint,3,opt,name=from" json:"from"`
Term uint64 `protobuf:"varint,4,opt,name=term" json:"term"`
// logTerm is generally used for appending Raft logs to followers. For example,
// (type=MsgApp,index=100,logTerm=5) means leader appends entries starting at
// index=101, and the term of entry at index 100 is 5.
// (type=MsgAppResp,reject=true,index=100,logTerm=5) means follower rejects some
// entries from its leader as it already has an entry with term 5 at index 100.
LogTerm uint64 `protobuf:"varint,5,opt,name=logTerm" json:"logTerm"`
Index uint64 `protobuf:"varint,6,opt,name=index" json:"index"`
Entries []Entry `protobuf:"bytes,7,rep,name=entries" json:"entries"`
Commit uint64 `protobuf:"varint,8,opt,name=commit" json:"commit"`
Snapshot Snapshot `protobuf:"bytes,9,opt,name=snapshot" json:"snapshot"`
Reject bool `protobuf:"varint,10,opt,name=reject" json:"reject"`
RejectHint uint64 `protobuf:"varint,11,opt,name=rejectHint" json:"rejectHint"`
Context []byte `protobuf:"bytes,12,opt,name=context" json:"context,omitempty"`
}
Copy the code
Just like comments, when the Leader sends a message to a Follower, he carries a LogTerm and Index to indicate the Term and Index of the previous entry of the sent message Entries. Normally, this entry would have been accepted by followers. The Leader records the progress of the followers and then synchronizes the logs with the progress.
When a Follower receives a message, the processing method is as follows:
func (r *raft) handleAppendEntries(m pb.Message) {
if m.Index < r.raftLog.committed {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
return
}
ifmlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...) ; ok { r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex}) }else {
r.logger.Debugf("%x [logterm: %d, index: %d] rejected MsgApp [logterm: %d, index: %d] from %x",
r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)
// Return a hint to the leader about the maximum index and term that the
// two logs could be divergent at. Do this by searching through the
// follower's log for the maximum (index, term) pair with a term <= the
// MsgApp's LogTerm and an index <= the MsgApp's Index. This can help
// skip all indexes in the follower's uncommitted tail with terms
// greater than the MsgApp's LogTerm.
//
// See the other caller for findConflictByTerm (in stepLeader) for a much
// more detailed explanation of this mechanism.
hintIndex := min(m.Index, r.raftLog.lastIndex())
hintIndex = r.raftLog.findConflictByTerm(hintIndex, m.LogTerm)
hintTerm, err := r.raftLog.term(hintIndex)
iferr ! =nil {
panic(fmt.Sprintf("term(%d) must be valid, but got %v", hintIndex, err))
}
r.send(pb.Message{
To: m.From,
Type: pb.MsgAppResp,
Index: m.Index,
Reject: true,
RejectHint: hintIndex,
LogTerm: hintTerm,
})
}
}
Copy the code
There is a branch of processing that determines whether to accept or reject logs. The condition of whether to reject or not is to determine whether the term corresponding to the index sent by the Leader matches the term corresponding to the index stored locally by the followers. If a match is made, logs are accepted. If a match is not made, logs are rejected. Of course, mismatches can’t just reject logs. Write useful feedback so that the Leader knows which messages to send to you so you can start receiving them.
// maybeAppend returns (0, false) if the entries cannot be appended. Otherwise,
// it returns (last index of new entries, true).
func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ... pb.Entry) (lastnewi uint64, ok bool) {
if l.matchTerm(index, logTerm) {
lastnewi = index + uint64(len(ents))
ci := l.findConflict(ents)
switch {
case ci == 0:
case ci <= l.committed:
l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed)
default:
// ents messages are index+1, index+2, index+3,....
// offset, ... ,, conflictIndex
// -> ents[ci-offset:]
offset := index + 1
l.append(ents[ci-offset:]...)
}
l.commitTo(min(committed, lastnewi))
return lastnewi, true
}
return 0.false
}
Copy the code
The processing mode of followers
The first step is as follows: determine which log to start with to find the conflict. The log to start with must be the one I own, so there is an operation with a smaller value
hintIndex := min(m.Index, r.raftlog.lastIndex())
Copy the code
The second step is as follows: return a hintIndex. Is this hintIndex the intersection of the conflict? Then the Leader returns the log from this hintIndex and the Follower catches it?
hintIndex = r.raftLog.findConflictByTerm(hintIndex, m.LogTerm)
Copy the code
To answer this question, look at what the findConflictByTerm method does. This method finds the index of the first log whose term is less than or equal to the LogTerm sent by the Follower Leader
// findConflictByTerm takes an (index, term) pair (indicating a conflicting log
// entry on a leader/follower during an append) and finds the largest index in
// log l with a term <= `term` and an index <= `index`. If no such index exists
// in the log, the log's first index is returned.
//
// The index provided MUST be equal to or less than l.lastIndex(). Invalid
// inputs log a warning and the input index is returned.
func (l *raftLog) findConflictByTerm(index uint64, term uint64) uint64 {
if li := l.lastIndex(); index > li {
// NB: such calls should not exist, but since there is a straightfoward
// way to recover, do it.
//
// It is tempting to also check something about the first index, but
// there is odd behavior with peers that have no log, in which case
// lastIndex will return zero and firstIndex will return one, which
// leads to calls with an index of zero into this method.
l.logger.Warningf("index(%d) is out of range [0, lastIndex(%d)] in findConflictByTerm",
index, li)
return index
}
for {
logTerm, err := l.term(index)
iflogTerm <= term || err ! =nil {
break
}
index--
}
return index
}
Copy the code
FindConflictByTerm () ¶ findConflictByTerm (index) returns the term corresponding to the index.
hintTerm, err := r.raftLog.term(hintIndex)
Copy the code
To sum up the above three steps, find the index and term of the first log in the Follower log whose value is less than or equal to that of the M. Logterm sent by the Leader and return the log to the Leader.
Has the problem been solved?
Would the Leader next send the message directly from this hintIndex? Isn’t. For example, in the following scenario, the Leader sends a message (IDx =9, term=5) to the Follower, who returns (hintIndex=6, hintTerm=2). If the Leader sends (IDx =6, term=5), the Follower will continue to reject the log because the log with the index=6 of the Follower corresponds to term 2. HintTerm =2 tells the Leader that the message should be sent (IDx =1, term=1) if the message is not rejected.
// For example, if the leader has:
//
// idx 1 2 3 4 5 6 7 8 9
// -----------------
// term (L) 1 3 3 3 5 5 5 5 5
// term (F) 1 1 1 1 2 2
Copy the code
What should the Leader do after receiving the rejected message? The simple way is to retry index-1 until a match is found, but this is inefficient. You can also use the findConflictByTerm method to find the first index that is less than or equal to term. If the Leader continues to send messages greater than hintTerm, the followers will reject the message. Of course, this process is not immediately found, and the Leader and Follower may have to communicate for many times to find the appropriate position for operation.
nextProbeIdx := m.RejectHint
if m.LogTerm > 0 {
nextProbeIdx = r.raftLog.findConflictByTerm(m.RejectHint, m.LogTerm)
}
if pr.MaybeDecrTo(m.Index, nextProbeIdx) {
r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
if pr.State == tracker.StateReplicate {
pr.BecomeProbe()
}
r.sendAppend(m.From)
}
Copy the code
In order to better illustrate the process, I spent the following two figures manually.
Scenario 1:
Scenario 2:
Follower Add log
Follower adding logs is done using the append function as follows:
func (l *raftLog) append(ents ... pb.Entry) uint64 {
if len(ents) == 0 {
return l.lastIndex()
}
if after := ents[0].Index - 1; after < l.committed {
l.logger.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed)
}
l.unstable.truncateAndAppend(ents)
return l.lastIndex()
}
Copy the code
As can be seen from the method name truncateAndAppend, this method handles collisions, and the branch default is the operation to handle collisions. Remove the log of index after Follower and add it to entries.
func (u *unstable) truncateAndAppend(ents []pb.Entry) {
after := ents[0].Index
switch {
case after == u.offset+uint64(len(u.entries)):
// after is the next index in the u.entries
// directly append
u.entries = append(u.entries, ents...)
case after <= u.offset:
u.logger.Infof("replace the unstable entries from index %d", after)
// The log is being truncated to before our current offset
// portion, so set the offset and replace the entries
u.offset = after
u.entries = ents
default:
// remove conflicted ents
// truncate to after and copy to u.entries
// then append
u.logger.Infof("truncate the unstable entries before index %d", after)
u.entries = append([]pb.Entry{}, u.slice(u.offset, after)...)
u.entries = append(u.entries, ents...) }}Copy the code
conclusion
This is an interesting solution to keep the logs of the followers and Leader consistent when the logs of the followers and Leader are different in raft.