introduce

Reference Raft implementation notes.

Raft is the most famous distributed consensus algorithm, which is widely used in ETCD and K8S.

In 6.824 Raft appears as the second experiment, the first in a series of experiments that will eventually implement a fault-tolerant KV storage system similar to Redis.

This article doesn’t cover Raft too much, but you can read extended Raft Paper to learn more.

6.824 The experiment implemented by Raft was divided into four parts, 2A, 2B, 2C and 2D. Each part will implement some of the functionality in Raft, and finally a complete Raft algorithm will be implemented as follows:

  • 2A: Leader election, moderate difficulty level;
  • 2B: Log, difficult level;
  • 2C: persistent, difficult level;
  • 2D: Log compression, difficult level.

Although the difficulty of part 2A is considered moderate, I don’t think so. Everything is difficult before it begins. As part 1, I have to take the responsibility of exploring and laying the foundation for the whole experiment.

Since each section of the experiment is based on the previous section, section 2A is the most important in order to move forward with subsequent experiments.

Part 2A

Part 2A needs to implement the leader election and heartbeat (AppendEntries RPC requests, but no logs). Each node in the cluster elects a leader. If the leader does not fail or packet loss occurs on the network, the leader remains unchanged. Otherwise, a new leader is elected.

The most important thing for completing 2A is to read and read figure2 on the paper as follows:

Remember, be sure to read more. If you don’t meet the point, be sure to read it, because there will be an answer.

2A can be divided into two parts: leader election and heartbeat. Elections involve timeout elections and RequestVote RPC requests, and heartbeats involve scheduled heartbeats and AppendEntries RPC requests.

2A can therefore be broken down into the following four parts:

  • A timeout triggered election, or ticker, triggers a new election when the node does not receive a heartbeat.
  • RequestVote, where the coordinator sends a vote request to other nodes.
  • The timed heartbeat, namely pingLoop, is unique to the leader, who vows sovereignty to other nodes and forbids them to initiate elections.
  • AppendEntries: The leader sends heartbeat information to other nodes. Currently, this does not include log entries.

Structure definition

The Raft node is defined by a structure as follows:

type Raft struct {
   mu        sync.Mutex 
   peers     []*labrpc.ClientEnd 
   persister *Persister 
   me        int                 // this peer's index into peers[]
   dead      int32               // set by Kill()
   // peer
   state PeerState
   // 2A
   currentTerm int // Current term
   votedFor    int // Who did you vote for
   leaderId    int // Cluster leader ID
   // apply channel
   applyCh chan ApplyMsg
   // time
   electionTimer *time.Timer // The election timeout period
   pingTimer     *time.Timer // Heartbeat timeout time
}
Copy the code

Raft structure itself contains fields such as MU and DEAD. To implement 2A, we must add necessary fields, such as the state field to indicate whether the current node is leader, follower or candidate. The new fields are as follows:

  • State: indicates the node status
  • CurrentTerm: indicates the current node term
  • VotedFor: Who did you vote for? -1 means you didn’t vote
  • LeaderId: leaderId of the cluster
  • ApplyCh: log application channel
  • ElectionTimer: indicates the election timeout time
  • PingTimer: indicates the heartbeat timeout period, which is only used by the leader

Let’s look at the structure used in RPC requests:

type RequestVoteArgs struct {
   Term        int
   CandidateId int
}
type RequestVoteReply struct {
   Term        int
   VoteGranted bool
}
Copy the code

Since RequestVote in 2A only serves as an election and does not require log-related fields, the parameters and replies are simpler, and the fields are explained in the illustration above. Similarly, the AppendEntries field is very simple and straightforward if it only acts as a heartbeat, see the illustration for details.

type AppendEntriesArgs struct {
   Term     int
   LeaderId int
}
type AppendEntriesReply struct {
   Term    int
   Success bool
}
Copy the code

Node status

Since the node has three states of follower, candidate and leader, these three states can be abstracted into three functions to facilitate switching among different states:

func (rf *Raft) becomeCandidate(a) {
   rf.state = Candidate // Change the node state
   rf.currentTerm++
   rf.votedFor = rf.me
   DPrintf("peer: %d become candidate\n", rf.me)
}
func (rf *Raft) becomeFollower(term int) {
   rf.state = Follower
   rf.currentTerm = term
   rf.votedFor = - 1
   rf.electionTimer.Reset(getRandElectTimeout())
   DPrintf("peer: %d become follower\n", rf.me)
}
func (rf *Raft) becomeLeader(a) {
   rf.state = Leader
   rf.leaderId = rf.me
   // TODO log processing
   // Heartbeat timer
   rf.pingTimer.Reset(heartbeatInterval)
   go rf.pingLoop() // Enable heartbeat loop
   DPrintf("peer: %d become leader\n", rf.me)
}
Copy the code
  • Candidate represents the coordinator, and when switching to candidate, in addition to setting state to candidate, you also need to increase the current term and vote for yourself.
  • When switching from follower to follower, in addition to setting state to follower, the current term of office should be set to the term of leader, votedFor should be set to -1, no one votes, and the election timer should be reset.
  • Leader represents the leader. When the leader is switched to follower, the state is set to follower, the heartbeat timer is reset, and the heartbeat loop is started to send heartbeat packets to other nodes.

From this logic, it can be seen that heartbeat sending is unique to the leader.

Node initialization

Create a new Raft node with Make and start the election between nodes as follows:

func Make(peers []*labrpc.ClientEnd, me int,
   persister *Persister, applyCh chan ApplyMsg) *Raft {
   rf := &Raft{}
   rf.peers = peers
   rf.persister = persister
   rf.me = me / / the node id
   rf.applyCh = applyCh
   // Your initialization code here (2A, 2B, 2C).
   rf.currentTerm = 0
   rf.votedFor = - 1
   rf.leaderId = - 1
   rf.state = Follower
   rf.electionTimer = time.NewTimer(getRandElectTimeout())
   rf.pingTimer = time.NewTimer(heartbeatInterval)
   // initialize from state persisted before a crash
   rf.readPersist(persister.ReadRaftState())
   // start ticker goroutine to start elections
   // The election begins
   DPrintf("peer: %d start ticker\n", rf.me)
   go rf.ticker()
   return rf
}
Copy the code

The Make function is mainly used to initialize node state data. For example, currentTerm is 0, votedFor and leaderId are -1, indicating empty, and the initial state of the node is Follower, which is important. We then open a new Goroutine with the go keyword and run the Ticker function, which is responsible for the leader election, more on that later. Finally, the current node instance is returned.

Two loop

There are two loops in part 2A, namely the infinite loop. One is ticker, the infinite loop of election. When no heartbeat is received within a certain period of time, a new election will be triggered. The other is pingLoop, which is unique to the leader. After the node becomes the leader, it continuously sends heartbeat RPCS to other nodes.

Every node has a ticker, but when the node becomes the leader, the ticker function actually does not work, as follows:

func (rf *Raft) ticker(a) {
   for rf.killed() == false {
      <-rf.electionTimer.C
      // The election timed out
      rf.electionTimer.Reset(getRandElectTimeout())
      rf.mu.Lock()
      if rf.state == Leader {
         rf.mu.Unlock()
         continue
      }
      rf.becomeCandidate()
      var votes int32 = 1 // Current 1 vote, oneself vote oneself
      me := rf.me
      for peerId := range rf.peers {
         if peerId == me { // Note: the comparison here still needs to be locked
            continue
         }
         // Create a goroutine to send the request
         go func(peerId int) {
            rf.mu.Lock()
            currentTerm := rf.currentTerm
            args := RequestVoteArgs{
               Term:        currentTerm,
               CandidateId: rf.me,
            }
            reply := RequestVoteReply{}
            DPrintf("peer: %d sendRequestVote to peer: %d\n", rf.me, peerId)
            rf.mu.Unlock() // Unlock. Sending RPC requests cannot be locked
            ok := rf.sendRequestVote(peerId, &args, &reply)
            if! ok {// The request failed
               DPrintf("peer: %d sendRequestVote fail\n", peerId)
               return
            }
            // Note: this section locks to reduce the impact of RPC request waiting
            rf.mu.Lock()
            // If the current node is no longer a candidate, exit
            ifrf.state ! = Candidate { rf.mu.Unlock()// Before exiting, remember to unlock
               return
            }
            // If the term in reply is larger than the current term, the current node becomes a follower
            if reply.Term > currentTerm {
               rf.becomeFollower(reply.Term)
            }
            if reply.VoteGranted { // If a vote is obtained
               // Add the current vote and determine if there is more than a majority
               atomic.AddInt32(&votes, 1)
               if int(votes) >= (len(rf.peers)+1) /2 {
                  DPrintf("peer: %d received %d votes, become leader\n", rf.me, votes)
                  rf.becomeLeader()
               }
            }
            rf.mu.Unlock()
         }(peerId)
      }
      rf.mu.Unlock() // Remember to come in pairs}}Copy the code

In line 7 of the code, if the current node state changes to Leader, the loop will be skipped because leader has already been created and there is no need to re-elect. In line 12, if a node becomes a candidate, it votes for itself, so the initial vote is 1.

On line 19, create a new Goroutine to handle RPC and avoid long lock waits.

Line 37 of code, this is important, if the current node has already obtained the majority of votes, then it has become the leader and does not need to process subsequent votes.

PingLoop does not work until the node becomes the leader, as follows:

func (rf *Raft) pingLoop(a) {
   for rf.killed() == false {
      // As the leader, you must send a heartbeat immediately instead of waiting for a timeout
      rf.mu.Lock()
      ifrf.state ! = Leader {// If you are not the leader, you are not qualified to send heartbeat and exit directly
         rf.mu.Unlock()
         return
      }
      // Send AppendEntries RPC to each other node
      leaderId := rf.leaderId
      term := rf.currentTerm
      for peerId := range rf.peers {
         if peerId == rf.me {
            continue
         }
         go func(peerId int) {
            args := AppendEntriesArgs{
               Term:     term,
               LeaderId: leaderId,
            }
            reply := AppendEntriesReply{}
            ok := rf.sendAppendEntries(peerId, &args, &reply)
            if! ok {return
            }
            rf.mu.Lock()
            // Others have long terms
            if reply.Term > term {
               rf.becomeFollower(reply.Term)
            }
            // TODO
            if reply.Success { // Indicates that log synchronization is successful
            } else { // Failed to synchronize logs
            }
            rf.mu.Unlock() / / unlock
         }(peerId)
      }
      rf.mu.Unlock()
      <-rf.pingTimer.C
      // Heartbeat timed out. Reset the timer
      rf.pingTimer.Reset(heartbeatInterval)
   }
}
Copy the code

The processing of pingLoop is slightly different from that of ticker. Ticker starts processing after the timer reaches the point, while pingLoop sends heartbeat directly and starts timing after sending heartbeat. This is because a leader needs to send heartbeat to other nodes immediately. In line 5 of the code, when the node status is not leader, the heartbeat cannot be sent and the node is unlocked and exits directly.

In line 29 of the code, when someone else’s tenure is large, become a follower directly.

RPC

As mentioned earlier, 2A involves two RPC requests, RequestVote and AppendEntries, both of which are RCP processors that handle RPC requests sent by other nodes.

 func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
   rf.mu.Lock()
   defer rf.mu.Unlock()
   reply.Term = rf.currentTerm
   // If the requester's term is less than the current term, return false
   if args.Term < rf.currentTerm {
      reply.VoteGranted = false
      return
   }
   // Once a requestor with a longer term is found, it becomes a follower
   if args.Term > rf.currentTerm {
      rf.becomeFollower(args.Term)
   }
   // If votedFor is empty, or if it is a requester ID, note that the log condition is AND, but 2A is not required
   if rf.votedFor == - 1 || rf.votedFor == args.CandidateId {
      reply.VoteGranted = true
      return
   }
   DPrintf("RequestVote, args: %v, reply: %v\n", args, reply)
}
Copy the code

RequestVote is used to process vote requests from other nodes and provide replies. At line 6, if the requester’s term is less than the current term, the vote is rejected and returned.

At line 11, if the requester’s term is longer than the current term, it becomes a follower and does the following to decide whether to vote.

On line 15, if the current node votedFor is null or candidateId, then the vote is performed.

Note: why should votedFor be null or candidateId be voted? The reason is that if candidateId is null, it indicates that the current node does not vote and the state is follower, so the voting can be direct. If candidateId is used, then the delegate has already voted for the requester and can still vote again. Of course this is only temporary, in the future experiments will check log integrity before deciding whether to vote or not.

func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
   rf.mu.Lock()
   defer rf.mu.Unlock()
   reply.Term = rf.currentTerm
   if args.Term < rf.currentTerm {
      reply.Success = false
      return
   }
   // Once a requestor with a longer term is found, it becomes a follower
   if args.Term > rf.currentTerm {
      rf.becomeFollower(args.Term)
   } else { // If the term is equal, the election time will be updated
      rf.electionTimer.Reset(getRandElectTimeout())
   }
   rf.leaderId = args.LeaderId
   / / TODO log
   reply.Success = true
   DPrintf("AppendEntries, args: %v, reply: %v\n", args, reply)
}
Copy the code

AppendEntries, in part 2A, is only responsible for the leader sending heartbeats to other nodes, so it is relatively simple without the complex logic of log synchronization. In line 5, if the requester’s term is less than the current term, the heartbeat fails and returns,

In line 10 of the code, if the requester’s term is found to be longer than the current term, it becomes follower, otherwise the term is the same and the election time is reset.

In line 15 of the code, leaderId is set and REPLY is set as heartbeat success. Note that log synchronization is not involved here for the time being, and the subsequent experiments need to be supplemented.

section

Part 2A of experiment is the foundation of experiment 2. The main work is as follows:

  • Three states, transition functions between node states.
  • There are two loops, ticker and pingLoop. PingLoop is unique to the leader, and ticker is also skipped in the leader node.
  • Two RPC requests and processes, one for voting and one for heartbeat.

Run tests:

go test -run 2A -race ...... Test (2A): initial election ... . Passed -- 3.1 3 116 14386 0 Test (2A): Election after network failure... . Passed -- 4.4 3 232 18654 0 Test (2A): Multiple elections... . Passed -- 5.7 7 1236 103166 0 PASS OK 6.824/raft 14.221sCopy the code

See 62fe2b6/ SRC /raft/raft.go for the complete code.