preface
A few days ago, I had dinner and talked with the big shots of Tencent. When I talked about my understanding of SOFAJRaft, I naturally thought I understood it well. But the big shots asked me how the logs between SOFAJRaft clusters were copied. I was at a loss to say how it was done, so this time I’ll take a look at how log replication is done in SOFAJRaft.
The Leader sends a probe to obtain the LastLogIndex of the Follower
After establishing a connection between the Leader node and the followers using the Replicator, the Leader node sends a Probe request of the Probe type to know the log location that the followers have, so that the Leader node can send follow-up logs to the followers.
The general process is as follows:
NodeImpl#becomeLeader->replicatorGroup#addReplicator->Replicator#start->Replicator#sendEmptyEntries
Copy the code
Finally, a probe is sent to obtain the LastLogIndex of the Follower by calling the Replicator sendEmptyEntries method
Replicator#sendEmptyEntries
private void sendEmptyEntries(final boolean isHeartbeat,
final RpcResponseClosure<AppendEntriesResponse> heartBeatClosure) {
final AppendEntriesRequest.Builder rb = AppendEntriesRequest.newBuilder();
// Set the cluster configuration to rb, such as Term, GroupId, ServerId, etc
if(! fillCommonFields(rb,this.nextIndex - 1, isHeartbeat)) {
// id is unlock in installSnapshot
installSnapshot();
if(isHeartbeat && heartBeatClosure ! =null) {
Utils.runClosureInThread(heartBeatClosure, new Status(RaftError.EAGAIN,
"Fail to send heartbeat to peer %s".this.options.getPeerId()));
}
return;
}
try {
final long monotonicSendTimeMs = Utils.monotonicMs();
final AppendEntriesRequest request = rb.build();
if (isHeartbeat) {
....// Omit the heartbeat code
} else {
// The statInfo class does not see where it is useful,
// Sending a probe request.
// The leader sends a probe to obtain the LastLogIndex of the Follower
this.statInfo.runningState = RunningState.APPENDING_ENTRIES;
// Set lastLogIndex to 1 less than firstLogIndex
this.statInfo.firstLogIndex = this.nextIndex;
this.statInfo.lastLogIndex = this.nextIndex - 1;
this.appendEntriesCounter++;
// Sets the current Replicator to the sending probe
this.state = State.Probe;
final int stateVersion = this.version;
// return reqSeq and increment reqSeq by one
final int seq = getAndIncrementReqSeq();
final Future<Message> rpcFuture = this.rpcService.appendEntries(this.options.getPeerId().getEndpoint(),
request, -1.new RpcResponseClosureAdapter<AppendEntriesResponse>() {
@Override
public void run(final Status status) {
onRpcReturned(Replicator.this.id, RequestType.AppendEntries, status, request, getResponse(), seq, stateVersion, monotonicSendTimeMs); }});//Inflight is an abstraction of logentries sent in batches. It indicates which logentries have been sent as log copy requests
// Encapsulate logEntry into Inflight
addInflight(RequestType.AppendEntries, this.nextIndex, 0.0, seq, rpcFuture);
}
LOG.debug("Node {} send HeartbeatRequest to {} term {} lastCommittedIndex {}".this.options.getNode()
.getNodeId(), this.options.getPeerId(), this.options.getTerm(), request.getCommittedIndex());
} finally {
this.id.unlock(); }}Copy the code
When the sendEmptyEntries method is called, isHeartbeat is false and heartBeatClosure is null because we send probes to get Follower displacement. FillCommonFields (groupId, ServerId, PeerIdLogIndex, etc.);
private boolean fillCommonFields(final AppendEntriesRequest.Builder rb, long prevLogIndex, final boolean isHeartbeat) {
final long prevLogTerm = this.options.getLogManager().getTerm(prevLogIndex); . rb.setTerm(this.options.getTerm());
rb.setGroupId(this.options.getGroupId());
rb.setServerId(this.options.getServerId().toString());
rb.setPeerId(this.options.getPeerId().toString());
rb.setPrevLogIndex(prevLogIndex);
rb.setPrevLogTerm(prevLogTerm);
rb.setCommittedIndex(this.options.getBallotBox().getLastCommittedIndex());
return true;
}
Copy the code
Note that prevLogIndex is nextIndex-1, which means the current index goes down, and it sets the properties in the statInfo instance, but I don’t see where the statInfo object is useful. An AppendEntriesRequest request is then sent to the Follower and onRpcReturned responds to the request. After sending the request, call addInflight to initialize an Inflight instance and add it to the Inflight set as follows:
private void addInflight(final RequestType reqType, final long startIndex, final int count, final int size,
final int seq, final Future<Message> rpcInfly) {
this.rpcInFly = new Inflight(reqType, startIndex, count, size, seq, rpcInfly);
this.inflights.add(this.rpcInFly);
this.nodeMetrics.recordSize("replicate-inflights-count".this.inflights.size());
}
Copy the code
Inflight is an abstraction of logentries sent in batches. It indicates which logentries have been sent as log copy requests. In this case, logentries are encapsulated in Inflight.
The Leader sends logs to the followers in batches
Replicator#sendEntries
private boolean sendEntries(final long nextSendingIndex) {
final AppendEntriesRequest.Builder rb = AppendEntriesRequest.newBuilder();
// Add the current Replicator configuration information to the RB database
if(! fillCommonFields(rb, nextSendingIndex -1.false)) {
// unlock id in installSnapshot
installSnapshot();
return false;
}
ByteBufferCollector dataBuf = null;
// The maximum size is 1024
final int maxEntriesSize = this.raftOptions.getMaxEntriesSize();
// A technique similar to object pooling is used to avoid duplicate object creation
final RecyclableByteBufferList byteBufList = RecyclableByteBufferList.newInstance();
try {
// Loop through all logentries encapsulated in byteBufList and EMB
for (int i = 0; i < maxEntriesSize; i++) {
final RaftOutter.EntryMeta.Builder emb = RaftOutter.EntryMeta.newBuilder();
//nextSendingIndex represents the next index to be sent, and I represents the offset
if(! prepareEntry(nextSendingIndex, i, emb, byteBufList)) {break;
}
rb.addEntries(emb.build());
}
// If EntriesCount is 0, there is no new data in LogManager
if (rb.getEntriesCount() == 0) {
if (nextSendingIndex < this.options.getLogManager().getFirstLogIndex()) {
installSnapshot();
return false;
}
// _id is unlock in _wait_more
waitMoreEntries(nextSendingIndex);
return false;
}
// Add data from byteBufList to rb
if (byteBufList.getCapacity() > 0) {
dataBuf = ByteBufferCollector.allocateByRecyclers(byteBufList.getCapacity());
for (final ByteBuffer b : byteBufList) {
dataBuf.put(b);
}
finalByteBuffer buf = dataBuf.getBuffer(); buf.flip(); rb.setData(ZeroByteStringHelper.wrap(buf)); }}finally {
// recycle byteBufList
RecycleUtil.recycle(byteBufList);
}
final AppendEntriesRequest request = rb.build();
if (LOG.isDebugEnabled()) {
LOG.debug(
"Node {} send AppendEntriesRequest to {} term {} lastCommittedIndex {} prevLogIndex {} prevLogTerm {} logIndex {} count {}".this.options.getNode().getNodeId(), this.options.getPeerId(), this.options.getTerm(),
request.getCommittedIndex(), request.getPrevLogIndex(), request.getPrevLogTerm(), nextSendingIndex,
request.getEntriesCount());
}
//statInfo
this.statInfo.runningState = RunningState.APPENDING_ENTRIES;
this.statInfo.firstLogIndex = rb.getPrevLogIndex() + 1;
this.statInfo.lastLogIndex = rb.getPrevLogIndex() + rb.getEntriesCount();
final Recyclable recyclable = dataBuf;
final int v = this.version;
final long monotonicSendTimeMs = Utils.monotonicMs();
final int seq = getAndIncrementReqSeq();
final Future<Message> rpcFuture = this.rpcService.appendEntries(this.options.getPeerId().getEndpoint(),
request, -1.new RpcResponseClosureAdapter<AppendEntriesResponse>() {
@Override
public void run(final Status status) {
// Recycle resources
RecycleUtil.recycle(recyclable);
onRpcReturned(Replicator.this.id, RequestType.AppendEntries, status, request, getResponse(), seq, v, monotonicSendTimeMs); }});/ / add Inflight
addInflight(RequestType.AppendEntries, nextSendingIndex, request.getEntriesCount(), request.getData().size(),
seq, rpcFuture);
return true;
}
Copy the code
- The fillCommonFields method is called to fill in the current Replicator configuration information to the RB.
- Call prepareEntry, calculate the current offset based on the current I and nextSendingIndex, then go to LogManager to find the corresponding LogEntry, and set the properties in the LogEntry to emB. Add data from LogEntry to the RecyclableByteBufferList.
- If there is no new data in LogEntry, then EntriesCount will be 0, and return;
- Add rb to byteBufList and add rb to byteBufList. Add rb to byteBufList and add rb to byteBufList and add rb to byteBufList.
- New AppendEntriesRequest instance to send requests;
- Add an Inflight to a queue. The Leader maintains a queue and adds an Inflight to the queue for each batch of Logentries it emits. In this way, when it knows that a batch of Logentries has failed to replicate, The logEntry batch and all subsequent logs can then be copied back to followers depending on the Inflight values in the queue. This ensures that the log replication can be completed and the log replication sequence remains unchanged
The RecyclableByteBufferList can be instantiated by object pools. The RecyclableByteBufferList can be instantiated by object pools. The RecyclableByteBufferList can be instantiated by object pools.
Let’s take a closer look at the specific methods in sendEntries.
PrepareEntry populates emB properties
Replicator#prepareEntry
boolean prepareEntry(final long nextSendingIndex, final int offset, final RaftOutter.EntryMeta.Builder emb,
final RecyclableByteBufferList dateBuffer) {
if (dateBuffer.getCapacity() >= this.raftOptions.getMaxBodySize()) {
return false;
}
// Set the current index to send
final long logIndex = nextSendingIndex + offset;
// If the index is not found in LogManager, return it directly
final LogEntry entry = this.options.getLogManager().getEntry(logIndex);
if (entry == null) {
return false;
}
// Set the properties in LogEntry to emB
emb.setTerm(entry.getId().getTerm());
if (entry.hasChecksum()) {
emb.setChecksum(entry.getChecksum()); / / since 1.2.6
}
emb.setType(entry.getType());
if(entry.getPeers() ! =null) { Requires.requireTrue(! entry.getPeers().isEmpty(),"Empty peers at logIndex=%d", logIndex);
for (final PeerId peer : entry.getPeers()) {
emb.addPeers(peer.toString());
}
if(entry.getOldPeers() ! =null) {
for (finalPeerId peer : entry.getOldPeers()) { emb.addOldPeers(peer.toString()); }}}else{ Requires.requireTrue(entry.getType() ! = EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION,"Empty peers but is ENTRY_TYPE_CONFIGURATION type at logIndex=%d", logIndex);
}
final intremaining = entry.getData() ! =null ? entry.getData().remaining() : 0;
emb.setDataLen(remaining);
// Put the data in the LogEntry into the dateBuffer
if(entry.getData() ! =null) {
// should slice entry data
dateBuffer.add(entry.getData().slice());
}
return true;
}
Copy the code
- Compare the incoming dateBuffer to see if it has exceeded the size set by the system (512 * 1024), or return false if it has
- If LogEntry is not found, then return false. The outer if judgment will execute break to break the loop
- The properties in the LogEntry are then set to the EMB object. Finally, the data in the LogEntry is added to the dateBuffer, separating the data from the properties
Follower processes log replication requests sent by the Leader
The leader after sending AppendEntriesRequest request, the request of data will be handled by AppendEntriesRequestProcessor in followers
The process is processRequest0
public Message processRequest0(final RaftServerService service, final AppendEntriesRequest request,
final RpcRequestClosure done) {
final Node node = (Node) service;
// Use pipeline by default
if (node.getRaftOptions().isReplicatorPipeline()) {
final String groupId = request.getGroupId();
final String peerId = request.getPeerId();
// Get the number of requests, in the groupId+peerId dimension
final int reqSequence = getAndIncrementSequence(groupId, peerId, done.getBizContext().getConnection());
//Follower processes the log request sent by the leader
final Message response = service.handleAppendEntriesRequest(request, new SequenceRpcRequestClosure(done,
reqSequence, groupId, peerId));
// Normal data returns null, abnormal data returns response
if(response ! =null) {
sendSequenceResponse(groupId, peerId, reqSequence, done.getAsyncContext(), done.getBizContext(),
response);
}
return null;
} else {
returnservice.handleAppendEntriesRequest(request, done); }}Copy the code
Call service handleAppendEntriesRequest will call to NodeIml handleAppendEntriesRequest method, HandleAppendEntriesRequest method just anomalies and the leader did not send information to return, is normally returns null
Process response log replication requests
NodeIml#handleAppendEntriesRequest
public Message handleAppendEntriesRequest(final AppendEntriesRequest request, final RpcRequestClosure done) {
boolean doUnlock = true;
final long startMs = Utils.monotonicMs();
this.writeLock.lock();
// Get the number of entrylogs
final int entriesCount = request.getEntriesCount();
try {
// Check whether the current node is active
if (!this.state.isActive()) {
LOG.warn("Node {} is not in active state, currTerm={}.", getNodeId(), this.currTerm);
return RpcResponseFactory.newResponse(RaftError.EINVAL, "Node %s is not in active state, state %s.",
getNodeId(), this.state.name());
}
// Verify that the passed serverId can be properly parsed
final PeerId serverId = new PeerId();
if(! serverId.parse(request.getServerId())) { LOG.warn("Node {} received AppendEntriesRequest from {} serverId bad format.", getNodeId(),
request.getServerId());
return RpcResponseFactory.newResponse(RaftError.EINVAL, "Parse serverId failed: %s.",
request.getServerId());
}
// Check the term
// Check stale term
if (request.getTerm() < this.currTerm) {
LOG.warn("Node {} ignore stale AppendEntriesRequest from {}, term={}, currTerm={}.", getNodeId(),
request.getServerId(), request.getTerm(), this.currTerm);
return AppendEntriesResponse.newBuilder() //
.setSuccess(false) //
.setTerm(this.currTerm) //
.build();
}
// Check term and state to step down
// If the current node is not a Follower node, perform the StepDown operation
checkStepDown(request.getTerm(), serverId);
// This indicates that the requested node is not the leader of the current node
if(! serverId.equals(this.leaderId)) {
LOG.error("Another peer {} declares that it is the leader at term {} which was occupied by leader {}.",
serverId, this.currTerm, this.leaderId);
// Increase the term by 1 and make both leaders step down to minimize the
// loss of split brain
stepDown(request.getTerm() + 1.false.new Status(RaftError.ELEADERCONFLICT,
"More than one leader in the same term."));
return AppendEntriesResponse.newBuilder() //
.setSuccess(false) //
.setTerm(request.getTerm() + 1) //
.build();
}
updateLastLeaderTimestamp(Utils.monotonicMs());
// Verify whether a snapshot is being generated
if (entriesCount > 0 && this.snapshotExecutor ! =null && this.snapshotExecutor.isInstallingSnapshot()) {
LOG.warn("Node {} received AppendEntriesRequest while installing snapshot.", getNodeId());
return RpcResponseFactory.newResponse(RaftError.EBUSY, "Node %s:%s is installing snapshot.".this.groupId, this.serverId);
}
Nextindex-1 of the node that initiated the request is passed in
final long prevLogIndex = request.getPrevLogIndex();
final long prevLogTerm = request.getPrevLogTerm();
final long localPrevLogTerm = this.logManager.getTerm(prevLogIndex);
PrevLogIndex does not match the prevLogIndex of the current node
if(localPrevLogTerm ! = prevLogTerm) {final long lastLogIndex = this.logManager.getLastLogIndex();
LOG.warn(
"Node {} reject term_unmatched AppendEntriesRequest from {}, term={}, prevLogIndex={}, prevLogTerm={}, localPrevLogTerm={}, lastLogIndex={}, entriesSize={}.",
getNodeId(), request.getServerId(), request.getTerm(), prevLogIndex, prevLogTerm, localPrevLogTerm,
lastLogIndex, entriesCount);
return AppendEntriesResponse.newBuilder() //
.setSuccess(false) //
.setTerm(this.currTerm) //
.setLastLogIndex(lastLogIndex) //
.build();
}
// Respond to heartbeat or send sendEmptyEntry
if (entriesCount == 0) {
// heartbeat
final AppendEntriesResponse.Builder respBuilder = AppendEntriesResponse.newBuilder() //
.setSuccess(true) //
.setTerm(this.currTerm)
// Returns the latest index of the current node
.setLastLogIndex(this.logManager.getLastLogIndex());
doUnlock = false;
this.writeLock.unlock();
// see the comments at FollowerStableClosure#run()
this.ballotBox.setLastCommittedIndex(Math.min(request.getCommittedIndex(), prevLogIndex));
return respBuilder.build();
}
// Parse request
long index = prevLogIndex;
final List<LogEntry> entries = new ArrayList<>(entriesCount);
ByteBuffer allData = null;
if (request.hasData()) {
allData = request.getData().asReadOnlyByteBuffer();
}
// Get all the data
final List<RaftOutter.EntryMeta> entriesList = request.getEntriesList();
for (int i = 0; i < entriesCount; i++) {
final RaftOutter.EntryMeta entry = entriesList.get(i);
index++;
if(entry.getType() ! = EnumOutter.EntryType.ENTRY_TYPE_UNKNOWN) {// Set the value of the logEntry attribute
final LogEntry logEntry = new LogEntry();
logEntry.setId(new LogId(index, entry.getTerm()));
logEntry.setType(entry.getType());
if (entry.hasChecksum()) {
logEntry.setChecksum(entry.getChecksum()); / / since 1.2.6
}
// Populate the data with logEntry
final long dataLen = entry.getDataLen();
if (dataLen > 0) {
final byte[] bs = new byte[(int) dataLen];
assertallData ! =null;
allData.get(bs, 0, bs.length);
logEntry.setData(ByteBuffer.wrap(bs));
}
if (entry.getPeersCount() > 0) {
// Only entries of the configuration type can have multiple peers
if(entry.getType() ! = EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION) {throw new IllegalStateException(
"Invalid log entry that contains peers but is not ENTRY_TYPE_CONFIGURATION type: "
+ entry.getType());
}
final List<PeerId> peers = new ArrayList<>(entry.getPeersCount());
for (final String peerStr : entry.getPeersList()) {
final PeerId peer = new PeerId();
peer.parse(peerStr);
peers.add(peer);
}
logEntry.setPeers(peers);
if (entry.getOldPeersCount() > 0) {
final List<PeerId> oldPeers = new ArrayList<>(entry.getOldPeersCount());
for (final String peerStr : entry.getOldPeersList()) {
final PeerId peer = newPeerId(); peer.parse(peerStr); oldPeers.add(peer); } logEntry.setOldPeers(oldPeers); }}else if (entry.getType() == EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION) {
throw new IllegalStateException(
"Invalid log entry that contains zero peers but is ENTRY_TYPE_CONFIGURATION type");
}
// Validate checksum
if (this.raftOptions.isEnableLogEntryChecksum() && logEntry.isCorrupted()) {
long realChecksum = logEntry.checksum();
LOG.error(
"Corrupted log entry received from leader, index={}, term={}, expectedChecksum={}, " +
"realChecksum={}",
logEntry.getId().getIndex(), logEntry.getId().getTerm(), logEntry.getChecksum(),
realChecksum);
return RpcResponseFactory.newResponse(RaftError.EINVAL,
"The log entry is corrupted, index=%d, term=%d, expectedChecksum=%d, realChecksum=%d", logEntry.getId().getIndex(), logEntry.getId().getTerm(), logEntry.getChecksum(), realChecksum); } entries.add(logEntry); }}// Store the log and return response
final FollowerStableClosure closure = new FollowerStableClosure(request, AppendEntriesResponse.newBuilder()
.setTerm(this.currTerm), this, done, this.currTerm);
this.logManager.appendEntries(entries, closure);
// update configuration after _log_manager updated its memory status
this.conf = this.logManager.checkAndSetConfiguration(this.conf);
return null;
} finally {
if (doUnlock) {
this.writeLock.unlock();
}
this.metrics.recordLatency("handle-append-entries", Utils.monotonicMs() - startMs);
this.metrics.recordSize("handle-append-entries-count", entriesCount); }}Copy the code
HandleAppendEntriesRequest method to write very long, but actually do a lot of check, the specific processing logic is not much
- Verifies that the current Node is still active, and returns an error response if it is not
- Verify that the format of the requested serverId is correct, or return an error response
- Verifies that the request’s tenure is less than the current one and returns a response of type AppendEntriesResponse if so
- Call the checkStepDown method to check the current node tenure and status, whether there is a leader, etc
- If the serverId of the request is the same as the leaderId of the current node, verify that the request was initiated by the leader, and return a AppendEntriesResponse if not
- Verify whether a snapshot is being generated
- Gets whether the request’s Index in the current node corresponds to a LogEntry with the same tenure as the request passed in, otherwise returns AppendEntriesResponse
- If the entriesCount passed in is zero, the leader may send a heartbeat or sendEmptyEntry, return AppendEntriesResponse and wrap the current term and latest index back
- The requested data is not empty, so all data is iterated
- Instantiate a logEntry, set the data and properties to the logEntry instance, and finally put the logEntry into the entries collection
- Call logManager to write the data batch commit log to RocksDB
Sends the response to the leader
Eventually sent to the leader of the response is sent by sendSequenceResponse AppendEntriesRequestProcessor
void sendSequenceResponse(final String groupId, final String peerId, final int seq,
final AsyncContext asyncContext, final BizContext bizContext, final Message msg) {
final Connection connection = bizContext.getConnection();
// Get context, dimension is groupId and peerId
final PeerRequestContext ctx = getPeerRequestContext(groupId, peerId, connection);
final PriorityQueue<SequenceMessage> respQueue = ctx.responseQueue;
assert(respQueue ! =null);
synchronized (Utils.withLockObject(respQueue)) {
// The data to be responded is placed in the priority queue
respQueue.add(new SequenceMessage(asyncContext, msg, seq));
// Check whether the number in the queue exceeds 256
if(! ctx.hasTooManyPendingResponses()) {while(! respQueue.isEmpty()) {final SequenceMessage queuedPipelinedResponse = respQueue.peek();
// If the sequence does not match, no response is sent
if(queuedPipelinedResponse.sequence ! = getNextRequiredSequence(groupId, peerId, connection)) {// sequence mismatch, waiting for next response.
break;
}
respQueue.remove();
try {
// Send the response
queuedPipelinedResponse.sendResponse();
} finally {
// add one to the sequencegetAndIncrementNextRequiredSequence(groupId, peerId, connection); }}}else {
LOG.warn("Closed connection to peer {}/{}, because of too many pending responses, queued={}, max={}",
ctx.groupId, peerId, respQueue.size(), ctx.maxPendingResponses);
connection.close();
// Close the connection if there are too many pending responses in queue.removePeerRequestContext(groupId, peerId); }}}Copy the code
This method will push the sent data into the PriorityQueue for sorting, and then get the element with the smallest sequence number and compare it with the nextRequiredSequence. If it is not equal, then it is out of order and does not send the request
The Leader handles the Response of the log replication
After receiving the Response from the Follower, the Leader calls the Replicator’s onRpcReturned method
static void onRpcReturned(final ThreadId id, final RequestType reqType, final Status status, final Message request,
final Message response, final int seq, final int stateVersion, final long rpcSendTime) {
if (id == null) {
return;
}
final long startTimeMs = Utils.nowMs();
Replicator r;
if ((r = (Replicator) id.lock()) == null) {
return;
}
// Check the version number, since every resetInflights increases version by one, so check
if(stateVersion ! = r.version) { LOG.debug("Replicator {} ignored old version response {}, current version is {}, request is {}\n, and response is {}\n, status is {}.",
r, stateVersion, r.version, request, response, status);
id.unlock();
return;
}
// Use the priority queue to sort by seq, the smallest will be the first
final PriorityQueue<RpcResponse> holdingQueue = r.pendingResponses;
// A priority queue is used here because the response is asynchronous and a small SEQ may respond more slowly than a large SEQ
holdingQueue.add(new RpcResponse(reqType, seq, status, request, response, rpcSendTime));
// The default holdingQueue contains a maximum of 256 entries
if (holdingQueue.size() > r.raftOptions.getMaxReplicatorInflightMsgs()) {
LOG.warn("Too many pending responses {} for replicator {}, maxReplicatorInflightMsgs={}",
holdingQueue.size(), r.options.getPeerId(), r.raftOptions.getMaxReplicatorInflightMsgs());
// Resend the probe
// Clear the data
r.resetInflights();
r.state = State.Probe;
r.sendEmptyEntries(false);
return;
}
boolean continueSendEntries = false;
final boolean isLogDebugEnabled = LOG.isDebugEnabled();
StringBuilder sb = null;
if (isLogDebugEnabled) {
sb = new StringBuilder("Replicator ").append(r).append(" is processing RPC responses,");
}
try {
int processed = 0;
while(! holdingQueue.isEmpty()) {// Get the smallest seq in the holdingQueue
final RpcResponse queuedPipelinedResponse = holdingQueue.peek();
// if the followers do not respond, the order is out of order and the Follower does not move down
//sequence mismatch, waiting for next response.
if(queuedPipelinedResponse.seq ! = r.requiredNextSeq) {// If there was processing before, break the loop directly here
if (processed > 0) {
if (isLogDebugEnabled) {
sb.append("has processed ").append(processed).append(" responses,");
}
break;
} else {
//Do not processed any responses, UNLOCK id and return.
continueSendEntries = false;
id.unlock();
return; }}// Remove the smallest seQ from the priority queue
holdingQueue.remove();
processed++;
// Gets the first element in the inflights queue
final Inflight inflight = r.pollInflight();
// When a request is made, inflight is queued
// If it is empty, ignore it
if (inflight == null) {
// The previous in-flight requests were cleared.
if (isLogDebugEnabled) {
sb.append("ignore response because request not found:").append(queuedPipelinedResponse)
.append(",\n");
}
continue;
}
// If seq is not aligned, the sequence is out of order
if(inflight.seq ! = queuedPipelinedResponse.seq) {// reset state
LOG.warn(
"Replicator {} response sequence out of order, expect {}, but it is {}, reset state to try again.",
r, inflight.seq, queuedPipelinedResponse.seq);
r.resetInflights();
r.state = State.Probe;
continueSendEntries = false;
// Lock the node and wait for some time according to the error category
r.block(Utils.nowMs(), RaftError.EREQUEST.getNumber());
return;
}
try {
switch (queuedPipelinedResponse.requestType) {
case AppendEntries:
// Process response for log replication
continueSendEntries = onAppendEntriesReturned(id, inflight, queuedPipelinedResponse.status,
(AppendEntriesRequest) queuedPipelinedResponse.request,
(AppendEntriesResponse) queuedPipelinedResponse.response, rpcSendTime, startTimeMs, r);
break;
case Snapshot:
// Process the snapshot's response
continueSendEntries = onInstallSnapshotReturned(id, r, queuedPipelinedResponse.status,
(InstallSnapshotRequest) queuedPipelinedResponse.request,
(InstallSnapshotResponse) queuedPipelinedResponse.response);
break; }}finally {
if (continueSendEntries) {
// Success, increase the response sequence.
r.getAndIncrementRequiredNextSeq();
} else {
// The id is already unlocked in onAppendEntriesReturned/onInstallSnapshotReturned, we SHOULD break out.
break; }}}}finally {
if (isLogDebugEnabled) {
sb.append(", after processed, continue to send entries: ").append(continueSendEntries);
LOG.debug(sb.toString());
}
if (continueSendEntries) {
// unlock in sendEntries.r.sendEntries(); }}}Copy the code
- Check the version number, as each resetInflights increments version by one, so check that it is the same batch of data
- The pendingResponses queue of the Replicator is obtained, and the data of the current response is added to the queue as an RpcResponse instance
- Check whether the number of elements in the queue is greater than 256. If the number is greater than 256, the data is cleared and resynchronized
- Verify that the holdingQueue is the same as the current requiredNextSeq. If not, break the holdingQueue to exit the loop
- Gets the first element in the inflights queue. If seQ is not aligned, the order is out of order, and the state is reset
- Call the onAppendEntriesReturned method to handle the response of log replication
- If this succeeds, sendEntries will be called to continue sending the replication log to the followers
Replicator#onAppendEntriesReturned
private static boolean onAppendEntriesReturned(final ThreadId id, final Inflight inflight, final Status status,
final AppendEntriesRequest request,
final AppendEntriesResponse response, final long rpcSendTime,
final long startTimeMs, final Replicator r) {
// Check whether the data sequence is correct
if(inflight.startIndex ! = request.getPrevLogIndex() +1) {
LOG.warn(
"Replicator {} received invalid AppendEntriesResponse, in-flight startIndex={}, request prevLogIndex={}, reset the replicator state and probe again.",
r, inflight.startIndex, request.getPrevLogIndex());
r.resetInflights();
r.state = State.Probe;
// unlock id in sendEmptyEntries
r.sendEmptyEntries(false);
return false;
}
/ / metric
// record metrics
if (request.getEntriesCount() > 0) {
r.nodeMetrics.recordLatency("replicate-entries", Utils.monotonicMs() - rpcSendTime);
r.nodeMetrics.recordSize("replicate-entries-count", request.getEntriesCount());
r.nodeMetrics.recordSize("replicate-entries-bytes", request.getData() ! =null ? request.getData().size()
: 0);
}
final boolean isLogDebugEnabled = LOG.isDebugEnabled();
StringBuilder sb = null;
if (isLogDebugEnabled) {
sb = new StringBuilder("Node "). //
append(r.options.getGroupId()).append(":").append(r.options.getServerId()). //
append(" received AppendEntriesResponse from "). //
append(r.options.getPeerId()). //
append(" prevLogIndex=").append(request.getPrevLogIndex()). //
append(" prevLogTerm=").append(request.getPrevLogTerm()). //
append(" count=").append(request.getEntriesCount());
}
// If the follower does not receive a successful response due to a crash, RPC call failure, etc
// then block for a while before calling
if(! status.isOk()) {// If the follower crashes, any RPC to the follower fails immediately,
// so we need to block the follower for a while instead of looping until
// it comes back or be removed
// dummy_id is unlock in block
if (isLogDebugEnabled) {
sb.append(" fail, sleep.");
LOG.debug(sb.toString());
}
// If a Replicator status listener is registered, notify all listeners
notifyReplicatorStatusListener(r, ReplicatorEvent.ERROR, status);
if (++r.consecutiveErrorTimes % 10= =0) {
LOG.warn("Fail to issue RPC to {}, consecutiveErrorTimes={}, error={}", r.options.getPeerId(),
r.consecutiveErrorTimes, status);
}
r.resetInflights();
r.state = State.Probe;
// unlock in in block
r.block(startTimeMs, status.getCode());
return false;
}
r.consecutiveErrorTimes = 0;
// The response failed
if(! response.getSuccess()) {// The Leader switch indicates that there may have been a network partition, and the new Leader will follow the new Leader
if (response.getTerm() > r.options.getTerm()) {
if (isLogDebugEnabled) {
sb.append(" fail, greater term ").append(response.getTerm()).append(" expect term ")
.append(r.options.getTerm());
LOG.debug(sb.toString());
}
// Get the representation of the current node -- NodeImpl
final NodeImpl node = r.options.getNode();
r.notifyOnCaughtUp(RaftError.EPERM.getNumber(), true);
r.destroy();
// Adjust your term term value
node.increaseTermTo(response.getTerm(), new Status(RaftError.EHIGHERTERMRESPONSE,
"Leader receives higher term heartbeat_response from peer:%s", r.options.getPeerId()));
return false;
}
if (isLogDebugEnabled) {
sb.append(" fail, find nextIndex remote lastLogIndex ").append(response.getLastLogIndex())
.append(" local nextIndex ").append(r.nextIndex);
LOG.debug(sb.toString());
}
if (rpcSendTime > r.lastRpcSendTimestamp) {
r.lastRpcSendTimestamp = rpcSendTime;
}
// Fail, reset the state to try again from nextIndex.
r.resetInflights();
// If the latest index of the followers is less than the index to be sent next time, set it to the index of the Follower response
// prev_log_index and prev_log_term doesn't match
if (response.getLastLogIndex() + 1 < r.nextIndex) {
LOG.debug("LastLogIndex at peer={} is {}", r.options.getPeerId(), response.getLastLogIndex());
// The peer contains less logs than leader
r.nextIndex = response.getLastLogIndex() + 1;
} else {
// The peer contains logs from old term which should be truncated,
// decrease _last_log_at_peer by one to test the right index to keep
if (r.nextIndex > 1) {
LOG.debug("logIndex={} dismatch", r.nextIndex);
r.nextIndex--;
} else {
LOG.error("Peer={} declares that log at index=0 doesn't match, which is not supposed to happen", r.options.getPeerId()); }}// If the response fails, the Follower logs need to be obtained again for re-synchronization
// dummy_id is unlock in _send_heartbeat
r.sendEmptyEntries(false);
return false;
}
if (isLogDebugEnabled) {
sb.append(", success");
LOG.debug(sb.toString());
}
// success
// The response succeeded in checking the tenure
if(response.getTerm() ! = r.options.getTerm()) { r.resetInflights(); r.state = State.Probe; LOG.error("Fail, response term {} dismatch, expect term {}", response.getTerm(), r.options.getTerm());
id.unlock();
return false;
}
if (rpcSendTime > r.lastRpcSendTimestamp) {
r.lastRpcSendTimestamp = rpcSendTime;
}
// The number of logs submitted this time
final int entriesSize = request.getEntriesCount();
if (entriesSize > 0) {
// The node confirms the commit
r.options.getBallotBox().commitAt(r.nextIndex, r.nextIndex + entriesSize - 1, r.options.getPeerId());
if (LOG.isDebugEnabled()) {
LOG.debug("Replicated logs in [{}, {}] to peer {}", r.nextIndex, r.nextIndex + entriesSize - 1, r.options.getPeerId()); }}else {
// The request is probe request, change the state into Replicate.
r.state = State.Replicate;
}
r.nextIndex += entriesSize;
r.hasSucceeded = true;
r.notifyOnCaughtUp(RaftError.SUCCESS.getNumber(), false);
// dummy_id is unlock in _send_entries
if (r.timeoutNowIndex > 0 && r.timeoutNowIndex < r.nextIndex) {
r.sendTimeoutNow(false.false);
}
return true;
}
Copy the code
The onAppendEntriesReturned method is also very long, but we need to be patient and look down
- Check the data sequence for errors
- Perform measurement and concatenation log operations
- Determine if the returned status is not normal, notify the listener, reset and block for a while before sending
- If Success status is false, then verify the tenure. Because the Leader switch, it indicates that there may have been a network partition and you need to follow the new Leader again. If there is no problem with the duration then reset and reset the nextIndex based on the latest index returned by the followers
- If there are no problems with the various validations, log commit confirmation is performed and the latest log commit location index is updated