The author open source Java version of Raft algorithm framework raft-Core project link: https://github.com/wujiuye/delay-scheduler/tree/main/raft/raft-core the project code is the delay – the scheduler (middleware) distributed delay dispatching module, level is limited, recommended only for learning.

(Photo credit: github.com/maemual/raf…)

About CAP Principle

C(consistency)A(availability)P(partition tolerance) principle is the distributed system will never get around the topic, in any distributed system, availability, consistency and partition tolerance of these three aspects are contradictory, the three can not have both, at most can only take two.

  • AP: If the system is highly available (A) and partition fault tolerance (P), then consistency must be abandoned (C);
  • CP: If strong data consistency is required (C), because network partitioning results in unlimited synchronization time (P), availability is not guaranteed, and then it is abandoned (A);
  • CA: If no network partition exists (partition refers to different machine room/country/region) (P), so strong consistency (C) and availability (A) can satisfy both.

Introduction to Raft consistency algorithm

In a Raft cluster, each node has a role, either Leader or Follower, and each node can be a Candidate until a Leader is elected.

Raft algorithm provides only one Leader node in a Raft cluster, and only the Leader node can process the read and write requests from the client. The Leader node translates the write requests into an operation diary, and the Leader node copies the operation diary to other Follower nodes. When the Leader node successfully synchronizes an operation diary to the majority of nodes (including itself), the operation diary can be applied to the state machine, which performs write operations (executing commands) to ensure the final consistency of data.

We can think of Binlog as the write commands performed by the Mysql database, whereas the MyISAM storage engine is the state machine of Binlog for executing commands.

Two RPC interfaces need to be implemented to implement the Raft algorithm:

  • RequestVoteRpc: During the election, the current candidate node initiates a request to solicit votes from other nodes;
  • AppendEmtriesRpcBy:LeaderNode to otherFollowerThe node sends diary replication requests, heartbeat requests, and commit diary requests.

Timed heartbeat timer

The Leader node periodically sends heartbeat packets to the other Follower nodes to refresh the election timeout on the other Follower nodes.

Heartbeat timers start when the node becomes the Leader node and stop when the node becomes the Follower node. The Heartbeat Timeout interval must be longer than the Election Timeout interval, that is, Heartbeat Timeout < Election Timeout.

Election timer

When the timing reaches the Election Timeout threshold, the Leader Election will be triggered. The current node will have the term number +1 and try to vote for itself (if it has not voted for other candidates). If it succeeds in voting for itself, it will become a candidate and make a request to other nodes to solicit votes.

The current timing of the timeout election timer can be reset and retimed when a AppendEntriesRPC request (including heartbeat requests) is received. The timeout threshold of each node is required to be different, so as to avoid the situation that the Leader cannot be elected after multiple rounds of elections.

Leader Election Process

Each node can only have one vote for each term number. Each node has the preference to vote for itself. The node that gets the most votes will become the Leader node.

RequestVoteRpc:

public class RequestVote {
    private long term;
    private int candidateId;
    private long lastLogIndex;
    private long lastLogTerm;
}
Copy the code
  • term: Current term number of the solicitor (candidate node);
  • candidateId: node of the canvasserID;
  • lastLogIndex: Index value of the latest diary entry of the canvasser;
  • lastLogTerm: Tenure number corresponding to the latest journal entry of the canvasser.

RequestVoteRpc response packet (vote packet) :

public class RequestVoteResp {
    private long term;
    private boolean voteGranted;
}
Copy the code
  • term: Current term number of the voting party, used to inform the canvassing party of the updatetermValue;
  • voteGranted: If the voting party votes for the canvassing party, thenvoteGrantedfortrue, or forfalse.

(Photo credit: github.com/maemual/raf…)

The process for initiating a request for votes when the election timer expires is as follows:

  • 1) the current tenure number (term)1;
  • 2) vote for yourself, and then switch your state to the candidate node (Candidate), so the first vote of each candidate node comes from itself;
  • 3) to other nodes in its clusterRequestVoteRPCAsk them to vote for you.

When receiving the soliciting request from other candidate nodes, each node shall make the following response according to the node’s current term number, diary synchronization, and whether it has cast a vote for other nodes (including itself) in the current period:

  • 1), if the canvassing partytermThe current that is less than itselftermTo return tofalse, alert the canvassertermObsolete, and make it clear to the canvasser that the vote will not be cast for it;
  • 2), if the canvassing partytermThe present is larger than itselftermAnd if you have not cast your vote for anyone (including yourself) before, you will cast your vote for this node and return the vote of the canvassertermandtrue;
  • 3), otherwise if canvassing partytermIs equal to the present of itselftermIf a vote has been cast for the canvasser (repeat request scenario) and the requester’s diary is as new as his own, the canvasser’s diary is returnedtermandtrue;
  • 4), otherwise, if a vote has been cast for someone else before then, the vote cannot be cast for the requester and the requester is clearly told that the vote will not be cast for it.

Candidate nodes shall make the following responses according to the final voting results after broadcasting the request:

  • 1), if the connection of most nodes is abnormal, the current period will continue to re-launch a poll, that is, the majority of nodes suspend the election abnormal;
  • 2) and get the votes of the majority of nodesLeader, including a vote for oneself, but each node has only one vote, voting for oneself cannot vote for other nodes;
  • 3), finds that another node has won the election (when the solicitation request respondstermGreater than the current candidate nodeterm, the other node is considered to have won the electionFollower;
  • 4), if the timeout election timer triggers the timeout election again, the message is not receivedLeaderThe heartbeat packet, the last election no node wins the election becomesLeaderSo go ahead and call an election.

If another node becomes the Leader of the current period, the Leader will send heartbeat packets to inform itself that it needs to leave enough time for the Leader to send heartbeat packets to itself. Therefore, the election timeout should be greater than the heartbeat timeout, that is: Heartbeat Timeout < Election Timeout (Election Timeout).

After the election, each Follower node must record which Leader node is the Leader node for the current period, and the Leader node must record all other Follower nodes. The Leader node needs to send heartbeat packets and diary synchronization requests to the other Follower nodes, while the other Follower nodes need to inform the clients to redirect to the Leader node when receiving the client requests.

Raft log replication process

In a Raft cluster, the Leader node is responsible for receiving read and write requests from the client. If the followers receive the requests, they need to redirect the requests to the Leader node.

If the Leader node receives read requests, the Leader node can directly query data and respond to the client. If the Leader node receives a write request, it translates the write request into an operation diary and appends the operation diary to the local node. At the same time, it initiates a call AppendEntriesRPC to copy the operation diary to other nodes. After most nodes are successfully replicated, The Leader node submits the diary. If the diary is successfully submitted, it is applied to the state machine and asynchronously initiates a call at AppendEntriesRPC to inform the other Follower nodes that the diary has been submitted. After receiving the submission request, the Follower node first changes the diary to the submitted state. Then apply the journal to the state machine.

AppendEntriesRPC request packet (the Leader node makes an RPC request to the other Follower nodes to copy the diary entry) :

public class AppendEntries implements Cloneable {
    private long term;
    private int leaderId;
    private long prevLogIndex;
    private long prevLogTerm;
    private long leaderCommit;
    private CommandLog[] entries;
}
Copy the code
  • term:LeaderThe tenure number of the node when the diary entry was created;
  • leaderId:LeaderThe node’sIDFor other reasonsFollowerThe node can redirect requests to the clientLeaderNode;
  • prevLogIndex:LeaderThe index of the latest diary in the submitted diary.
  • prevLogTerm:LeaderThe tenure number of the latest diary submitted by the node;
  • leaderCommit:LeaderNodes are eachFollowerAll maintain oneleaderCommitSaid,LeaderNodes thatFollowerIndex values of submitted journal entries;
  • entries: will append toFollowerIf it’s a heartbeat packet, thenentriesIs empty.

AppendEntriesRPC Response packets (AppendEntriesRPC responses) :

public class AppendEntriesResp {
    private long term;
    private boolean success;
}
Copy the code
  • term: Indicates the current term number. The value isMax(AppendEntriesRequested to carryterm.FollowerLocally maintainedterm) forLeaderThe node updates its tenure number onceLeaderA node finds that its tenure number is larger than its own, indicating that it is obsoleteLeaderTo stop sending heartbeat packets and actively switch toFollower;
  • success: Receiver (Follower) can be matchedprevLogIndexandprevLogTerm, the request is successful.

(Photo credit: github.com/maemual/raf…)

The Leader node processes client write requests and copies the write request diary to followers:

  • 0), to the clientLeaderSend a write request;
  • 1),LeaderAppending a write request to a local log file by parsing it into an operation instruction diary;
  • 2),LeaderAsynchronous to otherFollowerThe node sendsAppendEntriesRPCRequests;
  • 3), block until most nodes respond successfully, and most nodes are at least divided by the total number of nodes2add1Because ofLeaderThe nodes themselves count as one, so we just divide by the total number of nodes2All nodes respond successfully.
  • 4), if most nodes respond successfully:LeaderThe log entry is submitted and applied to the local state machine, asynchronously informing othersFollowerThe node diary has been submitted, and the operation result is immediately returned to the client.
  • 5), otherwise: The response fails to the client.

The Follower node processes the diary replication request flow:

  • 0), received anyAppendEntriesRPCRequests (including heartbeat packet requests, commit diary requests, and append diary requests) reset the current timing of the election timeout timer;
  • 1), if their owntermGreater than request parameterterm, another local recordLeaderIf the tenure number of thetermAnd,successforfalseInform the requester that you are expiredLeader);
  • 2(Otherwise, ifFollowerItself inprevLogIndexThe tenure number and request parameters of the diaryprevLogTermDoes not match, returns its owntermAnd,successforfalse(the currentFollowerThe node’s diary is behind);
  • 3), otherwise, if only one heartbeat packet is received, the packet is receivedLeaderThe heartbeat, that they have beenFollowerIf necessary, switch itself from candidate node toFollowerNode, returns its owntermAnd,successfortrue;
  • 4), otherwise,FollowerPerform a diary consistency check, delete existing but inconsistent diaries, add any entries that do not exist in the existing diaries, delete redundant entries, and, if it is a copy of an already committed entry, commit it directly when the copy is successful;
  • 5), if the request parameterleaderCommitThe present is larger than itselfcommitIndex, it willcommitIndexUpdated toMax(leaderCommit,commitIndex), optimistically will locally have submitted diarycommitIndexLeap into leadership for the rightFollowerTrace remembered values forFollowerA scene where you have just recovered from a failure.

If the Follower node fails to append the diary to the Leader node and the current number of the Follower node is less than or equal to the current number of the Leader node, the Leader node decrement the prevLogIndex request and reinitiates the AppendEntriesRPC request. This does not indicate that the leader is consistent with the follower in the log entry in the prevLogIndex position until AppendEntriesRPC returns success. In this case, all log entries before the prevLogIndex position on the Follower node are retained, and all log entries after the prevLogIndex position (which conflicts with the Leader position) are deleted by followers. All log entries on the Leader after the prevLogIndex location are appended from that location. Therefore, once AppendEntriesRPC returns successfully, the Leader and Follower logs are consistent.

consistency

As a candidate node must is a node majority vote to become a Leader, and voting time point will not vote for new candidate node does not have its own log, moreover Leader only in the diary has been successful sync to the majority of the nodes (including yourself) to submit a journal (diary into submitted state, while applied to the state machine), Therefore, each elected Leader is the node that contains all committed logs.

When the new Leader node synchronizes the new diary to a Follower node, if the Follower node’s diary falls far behind, the Follower node will actively remove the diary that the Leader does not have and synchronize the Leader node’s diary to the followers. For the diaries that the Leader node has marked as submitted, the followers can be directly applied to the state machine upon receipt to maintain the final consistency of the data.

Multi Raft

Suppose you have three machines, each deploying a Raft node service, and since all the read and write requests are handled by the Leader node, isn’t there only one machine working?

We can start multiple Raft services (note not multiple processes) for a node service and construct multiple Raft clusters, i.e. Multi Raft clusters, so that the Leader nodes of each Raft cluster can be evenly distributed across multiple machines. Such as:

The machine Raftnode Raftnode Raftnode
The machine1 RaftserviceAnode1(Leader) RaftserviceBnode1(Follower) RaftserviceCnode1(Follower)
The machine2 RaftserviceAnode2(Follower) RaftserviceBnode2(Leader) RaftserviceCnode2(Follower)
The machine3 RaftserviceAnode3(Follower) RaftserviceBnode3(Follower) RaftserviceCnode3(Leader)

In the distributed database TiDB, Multi Raft is used to shard data and make each Raft cluster responsible for a separate part of the data.

reference

  • Huawei Cloud Container Service teametcdIn-depth Analysis (Cloud Computing Technology Series)

Raft paper Address

  • RaftChinese version of thesis:Github.com/maemual/raf…

photo

  • Image source: github.com/maemual/raf…