RocketMQ DLedger multi-copy series series: Source code analysis RocketMQ DLedger multi-copy storage implementation 4. Source code analysis RocketMQ DLedger(multi-copy) log addition process

Following the source code analysis of the RocketMQ DLedger(multiple copy) log appending process, this paper continues the Leader to handle the most important part of the client append request process: log replication.

DLedger multi-copy log forwarding is implemented by DLedgerEntryPusher, which will be described in detail next.

One of the most important concepts in raft protocol is the number of committed logs and how to implement it. 2. The client sends a log to the DLedger cluster. Only when the log is accepted by most nodes in the cluster, the log can be considered as written successfully. 3. How to implement the actions of append and commit in RAFT protocol

This program recording

    • 1, DLedgerEntryPusher
      • 1.1 Core class diagram
      • 1.2 Construction method
      • 1.3 startup
    • 2, EntryDispatcher details
      • 2.1 Core class diagram
      • 2.2 Push Request Type
      • 2.3 doWork method details
        • 2.3.1 checkAndFreshState,
        • 2.3.2 Append Request Details
          • 2.3.2.1 doCommit Sends a commit request
          • 2.3.2.2 doCheckAppendResponse checks and appends the request
          • 2.3.2.3 doAppendInner Adds the request
        • 2.3.3 Compare Request details
          • 2.3.3.1 doTruncate,
    • 3, EntryHandler details
      • 3.1 Core class diagram
      • 3.2 handlePush
      • 3.3 doWork method details
        • 3.3.1 handleDoCommit
        • 3.3.2 rainfall distribution on 10-12 handleDoCompare
        • 3.3.3 handleDoTruncate
        • 3.3.4 handleDoAppend
        • 3.3.5 checkAbnormalFuture
    • 4, QuorumAckChecker
      • 4.1 class diagram
      • 4.2 doWork,

Log replication (log forwarding) is implemented by DLedgerEntryPusher. The specific class diagram is as follows:



It is mainly composed of the following four classes:

  • DLedgerEntryPusher DLedger Core class for log forwarding and processing. This class starts the following three objects, each corresponding to a thread.
  • EntryHandler The thread that receives and processes logs, activated when a node is a slave node.
  • QuorumAckChecker log appends the ACK vote processing thread, which is activated when the current node is the primary node.
  • EntryDispatcher log forwarding thread, appended when the current node is the primary node.

We’ll take a closer look at these four classes to reveal the core implementation of log replication.

1, DLedgerEntryPusher

1.1 Core class diagram



DLedger multi-copy log push core implementation class, which will create EntryDispatcher, QuorumAckChecker, EntryHandler three core threads. Its core attributes are as follows:

  • DLedgerConfig DLedgerConfig Configures multiple copies.
  • DLedgerStore DLedgerStore stores implementation classes.
  • MemberState MemberState Indicates the node state machine.
  • DLedgerRpcService DLedgerRpcService RPC service implementation class, used to communicate with other nodes in the cluster.
  • Map

    > peerWaterMarksByTerm The current water mark mark of each node based on the voting rounds. ConcurrentMap
    .
    ,>
  • The Map < Long, the ConcurrentMap < Long, TimeoutFuture > > pendingAppendResponsesByTerm response to deposit additional request results (Future).
  • EntryHandler EntryHandler A thread started from a node to receive push requests (append, Commit, append) from the primary node.
  • QuorumAckChecker The append request voter on the primary node.
  • Map

    dispatcherMap Primary node logs request forwarders and copy messages to secondary nodes.
    ,>

The implementation of its core methods is described next.

1.2 Construction method

public DLedgerEntryPusher(DLedgerConfig dLedgerConfig, MemberState memberState, DLedgerStore dLedgerStore, DLedgerRpcService dLedgerRpcService) {
    this.dLedgerConfig = dLedgerConfig;
    this.memberState = memberState;
    this.dLedgerStore = dLedgerStore;
    this.dLedgerRpcService = dLedgerRpcService;
    for (String peer : memberState.getPeerMap().keySet()) {
        if(! peer.equals(memberState.getSelfId())) { dispatcherMap.put(peer,newEntryDispatcher(peer, logger)); }}}Copy the code

The focus of the constructor is that the corresponding EntryDispatcher objects are built, in turn, based on the nodes within the cluster.

1.3 startup

DLedgerEntryPusher#startup

public void startup(a) {
    entryHandler.start();
    quorumAckChecker.start();
    for(EntryDispatcher dispatcher : dispatcherMap.values()) { dispatcher.start(); }}Copy the code

Start the EntryHandler, QuorumAckChecker, and EntryDispatcher threads in sequence.

Note: Other core methods of DLedgerEntryPusher are described in detail in the log replication principle.

The implementation of RocketMQ DLedger(multiple copies) will be explained from EntryDispatcher, QuorumAckChecker, EntryHandler.

2, EntryDispatcher details

2.1 Core class diagram



Its core attributes are as follows.

  • AtomicReference < PushEntryRequest. Type > Type = new AtomicReference < > PARE said (PushEntryRequest.Type.COM) to send commands from the node Type, optional value: PushEntryRequest.Type.COM PARE said, TRUNCATE, APPEND, COMMIT, details below.
  • Long lastPushCommitTimeMs = -1 Timestamp of the last commit type sent.
  • String peerId ID of the target node.
  • Long compareIndex = -1 Serial number of logs that are compared.
  • Long writeIndex = -1 Log write sequence number.
  • Int maxPendingSize = 1000 Maximum number of suspended logs allowed.
  • Long term = -1 Current voting cycle of the Leader node.
  • String leaderId = null Leader node ID.
  • Long lastCheckLeakTimeMs = System.CurrentTimemillis () The last time a leak was detected, the number of pending log requests is checked maxPendingSize.
  • ConcurrentMap

    pendingMap = new ConcurrentHashMap<>() Records the suspension time of the log. Key: indicates the sequence of the log (entryIndex), value: indicates the suspension time stamp.
    ,>
  • Quota Quota = new Quota (dLedgerConfig getPeerPushQuota ()) Quota.

2.2 Push Request Type

DLedger replicating logs from the primary node to the secondary node defines four request types. Its enumeration Type is pushentryRequest. Type and its values are COMPARE, TRUNCATE, APPEND, and COMMIT respectively.

  • COMPARE If the Leader changes, the new Leader needs to COMPARE with his slave node’s log entries to truncate the slave node’s excess data.
  • TRUNCATE If the Leader compares logs through an index, the Leader sends TRUNCATE to its slave node.
  • APPEND appends log entries to slave nodes.
  • COMMIT Normally, the Leader will attach the committed index to the Append request, but if the Append request is few and scattered, the Leader will send a separate request to notify the indexes committed from the node.

With a basic understanding of the request types for master and slave nodes, we’ll start with the EntryDispatcher business processing entry doWork method.

2.3 doWork method details

public void doWork(a) {
    try {
        if(! checkAndFreshState()) {/ / @ 1
            waitForRunning(1);
            return;
        }

        if (type.get() == PushEntryRequest.Type.APPEND) {   / / @ 2
            doAppend();
        } else {
            doCompare();                                                           / / @ 3
        }
        waitForRunning(1);
    } catch (Throwable t) {
        DLedgerEntryPusher.logger.error("[Push-{}]Error in {} writeIndex={} compareIndex={}", peerId, getName(), writeIndex, compareIndex, t);
        DLedgerUtils.sleep(500); }}Copy the code

Code @1: Checks the status to see if you can continue sending append or compare.

Code @2: If push type is APPEND, the master node propagates message requests to slave nodes.

Code @3: The master node sends a comparison data difference request to the slave node (this is often the first step when a new node is elected master).

2.3.1 checkAndFreshState,

EntryDispatcher#checkAndFreshState

private boolean checkAndFreshState(a) {
    if(! memberState.isLeader()) {/ / @ 1
        return false;
    }
    if(term ! = memberState.currTerm() || leaderId ==null| |! leaderId.equals(memberState.getLeaderId())) {/ / @ 2
        synchronized (memberState) {
            if(! memberState.isLeader()) {return false;
            }
            PreConditions.check(memberState.getSelfId().equals(memberState.getLeaderId()), DLedgerResponseCode.UNKNOWN);
            term = memberState.currTerm();
            leaderId = memberState.getSelfId();
            changeState(-1, PushEntryRequest.Type.COMPARE); }}return true;
}
Copy the code

Code @1: If the node is not in the primary state, return false. End the doWork method. Because only the master node needs to forward logs to the slave node.

Code @ 2: If the current node status is the primary node, but the current voting cycle and the state machine wheel number or the leaderId are not set, or the leaderId and the state machine’s leaderId are not equal, in this case, the cluster usually triggers a re-election, set its term, the leaderId and the state machine synchronization. The COMPARE request is about to be sent.

Next, take a look at changeState.

private synchronized void changeState(long index, PushEntryRequest.Type target) {
    logger.info("[Push-{}]Change state from {} to {} at {}", peerId, type.get(), target, index);
    switch (target) {
        case APPEND:      / / @ 1
            compareIndex = -1;
            updatePeerWaterMark(term, peerId, index);
            quorumAckChecker.wakeup();
            writeIndex = index + 1;
            break;
        case COMPARE:    / / @ 2
            if (this.type.compareAndSet(PushEntryRequest.Type.APPEND, PushEntryRequest.Type.COMPARE)) {
                compareIndex = -1;
                pendingMap.clear();
            }
            break;
        case TRUNCATE:     / / @ 3
            compareIndex = -1;
            break;
        default:
            break;
    }
    type.set(target);
} 
Copy the code

Code @1: If the target type is set to Append, reset compareIndex and set writeIndex to the current index plus 1.

Code @2: If the target type is set to COMPARE, reset compareIndex to negative one, then similar COMPARE requests will be sent to each slave node and pending requests will be cleared.

Code @3: If the target type is set to TRUNCATE, resets compareIndex to negative one.

Next, take a look at APPEND, COMPARE, TRUNCATE and other requests.

2.3.2 Append Request Details

EntryDispatcher#doAppend

private void doAppend(a) throws Exception {
    while (true) {
        if(! checkAndFreshState()) {/ / @ 1
            break;
        }
        if(type.get() ! = PushEntryRequest.Type.APPEND) {/ / @ 2
            break;
        }
        if (writeIndex > dLedgerStore.getLedgerEndIndex()) {    / / @ 3
            doCommit();
            doCheckAppendResponse();
            break;
        }
        if (pendingMap.size() >= maxPendingSize || (DLedgerUtils.elapsed(lastCheckLeakTimeMs) > 1000)) {     / / @ 4
            long peerWaterMark = getPeerWaterMark(term, peerId);
            for (Long index : pendingMap.keySet()) {
                if (index < peerWaterMark) {
                    pendingMap.remove(index);
                }
            }
            lastCheckLeakTimeMs = System.currentTimeMillis();
        }
        if (pendingMap.size() >= maxPendingSize) {    / / @ 5
            doCheckAppendResponse();
            break;
        }
        doAppendInner(writeIndex);                               / / @ 6writeIndex++; }}Copy the code

Code @1: Check the status, as detailed above.

@2: If the request type is not APPEND, exit and end this round of doWork method execution.

Code @ 3: WriteIndex refers to the sequence number currently appended to the slave node. Normally, when the master node sends an Append request to the slave node, the committed pointer of the master node is attached. However, how to send the append request less frequently? When writeIndex is greater than leaderEndIndex (due to pending requests exceeding the queue length of its pending requests (default: 1W), data appending will be prevented. In this case, writeIndex may be greater than leaderEndIndex, and a separate COMMIT request will be sent.

Code @4: Detects if pendingMap(number of pending requests) is sending a leak, that is, if the capacity in the pending queue exceeds the maximum allowed pending threshold. Gets the current water mark of the current node for this round (log sequence number of a successful append request). If the log sequence number of a pending request is smaller than the water mark, the node discards the request.

Code @5: Check and append the append request once if the pending request (waiting to be appended from the node) is larger than maxPendingSize.

Code @6: A concrete append request.

2.3.2.1 doCommit Sends a commit request

EntryDispatcher#doCommit

private void doCommit(a) throws Exception {
    if (DLedgerUtils.elapsed(lastPushCommitTimeMs) > 1000) {   / / @ 1
        PushEntryRequest request = buildPushRequest(null, PushEntryRequest.Type.COMMIT);   / / @ 2
        //Ignore the results
        dLedgerRpcService.push(request);                                                                                        / / @ 3lastPushCommitTimeMs = System.currentTimeMillis(); }}Copy the code

Code @1: If the interval between the last single commit request and the current time is less than 1s, the commit request is abandoned.

Code @2: Build the submit request.

Code @3: Send a COMMIT request to a slave node over the network.

Let’s look at how to build the COMMIT request package.

EntryDispatcher#buildPushRequest

private PushEntryRequest buildPushRequest(DLedgerEntry entry, PushEntryRequest.Type target) {
    PushEntryRequest request = new PushEntryRequest();
    request.setGroup(memberState.getGroup());  
    request.setRemoteId(peerId);                          
    request.setLeaderId(leaderId);
    request.setTerm(term);
    request.setEntry(entry);
    request.setType(target);
    request.setCommitIndex(dLedgerStore.getCommittedIndex());
    return request;
}
Copy the code

The packet submission request fields contain the following fields: group of the DLedger node, ID of the secondary node, ID of the primary node, current voting cycle, log content, request type, and committedIndex(log number of the primary node).

2.3.2.2 doCheckAppendResponse checks and appends the request

EntryDispatcher#doCheckAppendResponse

private void doCheckAppendResponse(a) throws Exception {
    long peerWaterMark = getPeerWaterMark(term, peerId);   / / @ 1
    Long sendTimeMs = pendingMap.get(peerWaterMark + 1); 
    if(sendTimeMs ! =null && System.currentTimeMillis() - sendTimeMs > dLedgerConfig.getMaxPushTimeOutMs()) { / / @ 2
        logger.warn("[Push-{}]Retry to push entry at {}", peerId, peerWaterMark + 1);
        doAppendInner(peerWaterMark + 1); }}Copy the code

This method checks whether the append request times out. Its key implementation is as follows:

  • Gets the serial number of the successful AppEnd.
  • Gets the next send time from the pending request queue. If it is not empty and the append timeout has been exceeded, resend the append request. The maximum timeout is 1s by default, which can be changed by maxPushTimeOutMs.
2.3.2.3 doAppendInner Adds the request

Send the append request to the slave node.

EntryDispatcher#doAppendInner

private void doAppendInner(long index) throws Exception {
    DLedgerEntry entry = dLedgerStore.get(index);   / / @ 1PreConditions.check(entry ! =null, DLedgerResponseCode.UNKNOWN, "writeIndex=%d", index);
    checkQuotaAndWait(entry);                                   / / @ 2
    PushEntryRequest request = buildPushRequest(entry, PushEntryRequest.Type.APPEND);   / / @ 3
    CompletableFuture<PushEntryResponse> responseFuture = dLedgerRpcService.push(request);   / / @ 4
    pendingMap.put(index, System.currentTimeMillis());                                                                          / / @ 5
    responseFuture.whenComplete((x, ex) -> {
        try {
            PreConditions.check(ex == null, DLedgerResponseCode.UNKNOWN);
            DLedgerResponseCode responseCode = DLedgerResponseCode.valueOf(x.getCode());
            switch (responseCode) {
                case SUCCESS:                                                                                                                / / @ 6
                    pendingMap.remove(x.getIndex());
                    updatePeerWaterMark(x.getTerm(), peerId, x.getIndex());
                    quorumAckChecker.wakeup();
                    break;
                case INCONSISTENT_STATE:                                                                                         / / @ 7
                    logger.info("[Push-{}]Get INCONSISTENT_STATE when push index={} term={}", peerId, x.getIndex(), x.getTerm());
                    changeState(-1, PushEntryRequest.Type.COMPARE);
                    break;
                default:
                    logger.warn("[Push-{}]Get error response code {} {}", peerId, responseCode, x.baseInfo());
                    break; }}catch (Throwable t) {
            logger.error("", t); }}); lastPushCommitTimeMs = System.currentTimeMillis(); }Copy the code

Code @1: First query logs according to the serial number.

Code @2: Detects the quota. If the quota is exceeded, certain flow limiting will be carried out. The key implementation points are:

  • The condition is triggered first: the number of append pending requests exceeds the maximum allowed number of pending requests. Based on file storage and the difference between master and slave is more than 300 MB, can be configured using peerPushThrottlePoint.
  • If more than 20 MB logs are appended every second (configured by peerPushQuota), the system will sleep for 1s before appended logs are appended.

Code @3: Build PUSH request logs.

Code @4: Send a network request to a slave node via Netty, and the slave node processes the request when it is received (this article does not discuss network-related implementation details).

Code @5: Use pendingMap to record the sending time of the logs to be added, which is used as a basis for the sender to judge whether the logs have timed out.

Code @6: Request success processing logic, its key implementation points are as follows:

  • Remove the pendingMap timeout for this log.
  • Update the log sequence number that has been successfully appended (organized by voting rounds, and one key/value pair per slave server).
  • Wakes up the quorumAckChecker thread (primarily used to arbitrate append results), more on that later.

Code @7: If the state of Push request is inconsistent, the COMPARE request will be sent to COMPARE whether the data of master and slave nodes is consistent.

The log forwarding append request type is described here, and we will continue to explore another request type compare.

2.3.3 Compare Request details

COMPARE requests are sent by the doCompare method, which first runs in while (true), so be aware of its exit condition when looking at the code below. EntryDispatcher#doCompare

if(! checkAndFreshState()) {break;
}
if(type.get() ! = PushEntryRequest.Type.COMPARE && type.get() ! = PushEntryRequest.Type.TRUNCATE) {break;
}
if (compareIndex == -1 && dLedgerStore.getLedgerEndIndex() == -1) {
    break;
}
Copy the code

Step1: Verify whether the execution is carried out, the key points are as follows:

  • Check whether it is the primary node. If not, jump out.
  • If the request type is not COMPARE or TRUNCATE, it is displayed directly.
  • If both the compared index and ledgerEndIndex are -1, indicating that a new DLedger cluster is displayed.

EntryDispatcher#doCompare

if (compareIndex == -1) {
    compareIndex = dLedgerStore.getLedgerEndIndex();
    logger.info("[Push-{}][DoCompare] compareIndex=-1 means start to compare", peerId);
} else if (compareIndex > dLedgerStore.getLedgerEndIndex() || compareIndex < dLedgerStore.getLedgerBeginIndex()) {
    logger.info("[Push-{}][DoCompare] compareIndex={} out of range {}-{}", peerId, compareIndex, dLedgerStore.getLedgerBeginIndex(), dLedgerStore.getLedgerEndIndex());
    compareIndex = dLedgerStore.getLedgerEndIndex();
}
Copy the code

Step2: if compareIndex is -1 or compareIndex is not in the valid range, reset the serial number to be compared to the current maximum log number: ledgerEndIndex.

DLedgerEntry entry = dLedgerStore.get(compareIndex); PreConditions.check(entry ! =null, DLedgerResponseCode.INTERNAL_ERROR, "compareIndex=%d", compareIndex);
PushEntryRequest request = buildPushRequest(entry, PushEntryRequest.Type.COMPARE);
CompletableFuture<PushEntryResponse> responseFuture = dLedgerRpcService.push(request);
PushEntryResponse response = responseFuture.get(3, TimeUnit.SECONDS);
Copy the code

Step3: Query the log according to the serial number, and initiate the COMPARE request to the secondary node, whose timeout time is 3s.

EntryDispatcher#doCompare

long truncateIndex = -1;
if (response.getCode() == DLedgerResponseCode.SUCCESS.getCode()) {   / / @ 1
    if (compareIndex == response.getEndIndex()) {
        changeState(compareIndex, PushEntryRequest.Type.APPEND);
        break;
    } else{ truncateIndex = compareIndex; }}else if (response.getEndIndex() < dLedgerStore.getLedgerBeginIndex() 
        || response.getBeginIndex() > dLedgerStore.getLedgerEndIndex()) {    / / @ 2
    truncateIndex = dLedgerStore.getLedgerBeginIndex();
} else if (compareIndex < response.getBeginIndex()) {                                    / / @ 3
    truncateIndex = dLedgerStore.getLedgerBeginIndex();
} else if (compareIndex > response.getEndIndex()) {                                      / / @ 4
    compareIndex = response.getEndIndex();
} else {                                                                                                              / / @ 5
	compareIndex--;
}

if (compareIndex < dLedgerStore.getLedgerBeginIndex()) {                          / / @ 6
    truncateIndex = dLedgerStore.getLedgerBeginIndex();
}
Copy the code

Step4: calculate the log sequence number to be truncated according to the response results, and the key points are as follows:

  • Code @1: If the log number of the two is the same, there is no need to truncate the log, and the append request will be directly sent from the node next time. Otherwise, truncateIndex is set to endIndex in the response result.
  • Code @2: If the maximum log number stored by the slave node is less than the minimum log number stored by the master node, or the minimum log number stored by the slave node is greater than the maximum log number stored by the master node, that is, the two do not intersect. This usually occurs when the slave node crashes for a long time and the master node removes expired entries. TruncateIndex sets ledgerBeginIndex of the primary node, that is, the current minimum offset of the primary node.
  • Code @3: If the log number that has been compared is less than the start log number of the slave node, it is likely that the loss is sent from the node disk and synchronization starts from the minimum log number of the master node.
  • Code @4: If the log number that has been compared is greater than the maximum log number of the secondary node, the comparison index is set to the maximum log number of the secondary node to trigger data synchronization.
  • Code @5: If the log number that has been compared is larger than the start log number of the slave node but smaller than the maximum log number of the slave node, the index to be compared is reduced by one.
  • Code @6: Set the minimum log number of the primary node if the log number compared is less than the minimum log number required by the primary node.
if(truncateIndex ! = -1) {
    changeState(truncateIndex, PushEntryRequest.Type.TRUNCATE);
    doTruncate(truncateIndex);
    break;
}
Copy the code

Step5: if the log sequence number is different from -1, send the TRUNCATE request to the secondary node.

2.3.3.1 doTruncate,
private void doTruncate(long truncateIndex) throws Exception { PreConditions.check(type.get() == PushEntryRequest.Type.TRUNCATE, DLedgerResponseCode.UNKNOWN); DLedgerEntry truncateEntry = dLedgerStore.get(truncateIndex); PreConditions.check(truncateEntry ! =null, DLedgerResponseCode.UNKNOWN);
    logger.info("[Push-{}]Will push data to truncate truncateIndex={} pos={}", peerId, truncateIndex, truncateEntry.getPos());
    PushEntryRequest truncateRequest = buildPushRequest(truncateEntry, PushEntryRequest.Type.TRUNCATE);
    PushEntryResponse truncateResponse = dLedgerRpcService.push(truncateRequest).get(3, TimeUnit.SECONDS); PreConditions.check(truncateResponse ! =null, DLedgerResponseCode.UNKNOWN, "truncateIndex=%d", truncateIndex);
    PreConditions.check(truncateResponse.getCode() == DLedgerResponseCode.SUCCESS.getCode(), DLedgerResponseCode.valueOf(truncateResponse.getCode()), "truncateIndex=%d", truncateIndex);
    lastPushCommitTimeMs = System.currentTimeMillis();
    changeState(truncateIndex, PushEntryRequest.Type.APPEND);
}
Copy the code

This method mainly builds truncate requests to slave nodes.

The master node is responsible for pushing requests to the slave server, and the slave node naturally processes these requests. Next, we will analyze how the slave node responds according to the requests sent by the master node.

3, EntryHandler details

EntryHandler is also a thread that is activated when a node is in slave state.

3.1 Core class diagram



Its core attributes are as follows:

  • Long lastCheckFastForwardTimeMs last check whether there is the main server push message timestamp.
  • ConcurrentMap

    >> writeRequestMap Append request processing queue.
    ,>
  • BlockingQueue > > compareOrTruncateRequests COMMIT, COMPARE, TRUNCATE related request

3.2 handlePush

As we know from the above, the master node will actively propagate logs to the slave node, and the slave node will receive request data for processing through the network, and its call chain is shown in the figure:



Eventually the handlePush method of EntryHandler is called.

EntryHandler#handlePush

public CompletableFuture<PushEntryResponse> handlePush(PushEntryRequest request) throws Exception {
    //The timeout should smaller than the remoting layer's request timeout
    CompletableFuture<PushEntryResponse> future = new TimeoutFuture<>(1000);      / / @ 1
    switch (request.getType()) {
        case APPEND:                                                                                                          / / @ 2PreConditions.check(request.getEntry() ! =null, DLedgerResponseCode.UNEXPECTED_ARGUMENT);
            long index = request.getEntry().getIndex();
            Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> old = writeRequestMap.putIfAbsent(index, new Pair<>(request, future));
            if(old ! =null) {
                logger.warn("[MONITOR]The index {} has already existed with {} and curr is {}", index, old.getKey().baseInfo(), request.baseInfo());
                future.complete(buildResponse(request, DLedgerResponseCode.REPEATED_PUSH.getCode()));
            }
            break;
        case COMMIT:                                                                                                           / / @ 3
            compareOrTruncateRequests.put(new Pair<>(request, future));
            break;
        case COMPARE:
        case TRUNCATE:                                                                                                     / / @ 4PreConditions.check(request.getEntry() ! =null, DLedgerResponseCode.UNEXPECTED_ARGUMENT);
            writeRequestMap.clear();
            compareOrTruncateRequests.put(new Pair<>(request, future));
            break;
        default:
            logger.error("[BUG]Unknown type {} from {}", request.getType(), request.baseInfo());
            future.complete(buildResponse(request, DLedgerResponseCode.UNEXPECTED_ARGUMENT.getCode()));
            break;
    }
    return future;
}
Copy the code

The push request of the master node is processed from several points, the key points of which are as follows.

Code @1: First build a response result Future with a default timeout of 1s.

Code @2: If it is an APPEND request, put it into the writeRequestMap collection. If the data structure already exists, it means that the master node pushes the result repeatedly, and the status code of the result is REPEATED_PUSH. Requests for writing are put into the writeRequestMap by the doWork method.

Code @ 3: if the request is submitted, the request into compareOrTruncateRequests request processing, the doWork method asynchronous processing.

Code @ 4: if it is COMPARE or TRUNCATE request, to be written to the queue writeRequestMap to empty, and put the request in compareOrTruncateRequests request queue, asynchronous processing by doWork method.

Next, we focus on analyzing the implementation of doWork method.

3.3 doWork method details

EntryHandler#doWork

public void doWork(a) {
    try {
        if(! memberState.isFollower()) {/ / @ 1
            waitForRunning(1);
            return;
        }
        if(compareOrTruncateRequests.peek() ! =null) {    / / @ 2Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> pair = compareOrTruncateRequests.poll(); PreConditions.check(pair ! =null, DLedgerResponseCode.UNKNOWN);
            switch (pair.getKey().getType()) {
                case TRUNCATE:
                    handleDoTruncate(pair.getKey().getEntry().getIndex(), pair.getKey(), pair.getValue());
                    break;
                case COMPARE:
                    handleDoCompare(pair.getKey().getEntry().getIndex(), pair.getKey(), pair.getValue());
                    break;
                case COMMIT:
                    handleDoCommit(pair.getKey().getCommitIndex(), pair.getKey(), pair.getValue());
                    break;
                default:
                    break; }}else { / / @ 3
            long nextIndex = dLedgerStore.getLedgerEndIndex() + 1;
            Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> pair = writeRequestMap.remove(nextIndex);
            if (pair == null) {
                checkAbnormalFuture(dLedgerStore.getLedgerEndIndex());
                waitForRunning(1);
                return; } PushEntryRequest request = pair.getKey(); handleDoAppend(nextIndex, request, pair.getValue()); }}catch (Throwable t) {
        DLedgerEntryPusher.logger.error("Error in {}", getName(), t);
        DLedgerUtils.sleep(100); }}Copy the code

Code @1: Jumps if the current node state is not from the node.

Code @ 2: if compareOrTruncateRequests queue is not empty, indicating a COMMIT, COMPARE, TRUNCATE, such as request, this kind of request priority. It is worth noting that non-blocking methods such as peek and poll are used and the corresponding methods are called depending on the type of request. More on that later.

Code @3: if there is only append class request, it will try to obtain the next message copy request (ledgerEndIndex + 1) as key from writeRequestMap according to the maximum message sequence number of the current node. If it is not empty, the doAppend request is executed, and if it is empty, checkAbnormalFuture is called to handle the exception.

Let’s focus on the processing details.

3.3.1 handleDoCommit

DLedgerStore updateCommittedIndex to update its committed offset. Therefore, let’s take a look at the DLedgerStore updateCommittedIndex method in detail.

DLedgerMmapFileStore#updateCommittedIndex

public void updateCommittedIndex(long term, long newCommittedIndex) {   / / @ 1
    if (newCommittedIndex == -1
            || ledgerEndIndex == -1
            || term < memberState.currTerm()
            || newCommittedIndex == this.committedIndex) {                               / / @ 2
            return;
    }
    if (newCommittedIndex < this.committedIndex
            || newCommittedIndex < this.ledgerBeginIndex) {                             / / @ 3
        logger.warn("[MONITOR]Skip update committed index for new={} < old={} or new={} < beginIndex={}", newCommittedIndex, this.committedIndex, newCommittedIndex, this.ledgerBeginIndex);
        return;
    }
    long endIndex = ledgerEndIndex;
    if (newCommittedIndex > endIndex) {                                                       / / @ 4
            //If the node fall behind too much, the committedIndex will be larger than enIndex.
        newCommittedIndex = endIndex;
    }
    DLedgerEntry dLedgerEntry = get(newCommittedIndex);                        / / @ 5PreConditions.check(dLedgerEntry ! =null, DLedgerResponseCode.DISK_ERROR);
    this.committedIndex = newCommittedIndex;
    this.committedPos = dLedgerEntry.getPos() + dLedgerEntry.getSize();     / / @ 6
}
Copy the code

@1: Let’s start with the method arguments:

  • Long Term Current voting cycle of the primary node.
  • Long newCommittedIndex: Committed log sequence number when the primary node sends a log replication request.

Code @2: If the submission number to be updated is -1 or the voting number is smaller than the voting number of the secondary node or the voting number of the primary node is equal to the submitted number of the secondary node, the submission action will be ignored.

Code @3: If the number of the submitted logs of the primary node is smaller than that of the submitted logs of the secondary node or the number to be submitted is smaller than the minimum valid log number of the current node, the warning log [MONITOR] is output and the submission action is ignored.

Code @4: If the slave node is too far behind the master node, reset the commit index to the current maximum valid log number of the slave node.

Code @5: Try to find data from the slave node based on the number to be committed, if the data does not exist, raise DISK_ERROR.

@6: Update commitedIndex and committedPos. DledgerStore will periodically flush the committed pointer to the checkpoint file to persist the commitedIndex pointer.

3.3.2 rainfall distribution on 10-12 handleDoCompare

To process the COMPARE request sent by the master node, its implementation is also relatively simple. Finally, the buildResponse method is called to construct the response result.

EntryHandler#buildResponse

private PushEntryResponse buildResponse(PushEntryRequest request, int code) {
    PushEntryResponse response = new PushEntryResponse();
    response.setGroup(request.getGroup());
    response.setCode(code);
    response.setTerm(request.getTerm());
    if(request.getType() ! = PushEntryRequest.Type.COMMIT) { response.setIndex(request.getEntry().getIndex()); } response.setBeginIndex(dLedgerStore.getLedgerBeginIndex()); response.setEndIndex(dLedgerStore.getLedgerEndIndex());return response;
}
Copy the code

It mainly returns ledgerBeginIndex, ledgerEndIndex and voting rounds of the current point for the primary node to judge and compare.

3.3.3 handleDoTruncate

The implementation of handleDoTruncate method is relatively simple. Delete all logs following the truncateIndex log sequence number from nodes, and call the TRUNCate method of dLedgerStore. Since its storage is similar to RocketMQ’s storage design, this article will not introduce the details of its implementation: according to the log number, to locate the log file, if the specific file hit, modify the corresponding read/write pointer, brush pointer, and delete all files behind the physical file. If you’re interested, check out chapter 4 of my RocketMQ Tech Insider: RocketMQ Storage.

3.3.4 handleDoAppend

private void handleDoAppend(long writeIndex, PushEntryRequest request,
    CompletableFuture<PushEntryResponse> future) {
    try {
        PreConditions.check(writeIndex == request.getEntry().getIndex(), DLedgerResponseCode.INCONSISTENT_STATE);
        DLedgerEntry entry = dLedgerStore.appendAsFollower(request.getEntry(), request.getTerm(), request.getLeaderId());
        PreConditions.check(entry.getIndex() == writeIndex, DLedgerResponseCode.INCONSISTENT_STATE);
        future.complete(buildResponse(request, DLedgerResponseCode.SUCCESS.getCode()));
        dLedgerStore.updateCommittedIndex(request.getTerm(), request.getCommitIndex());
    } catch (Throwable t) {
        logger.error("[HandleDoWrite] writeIndex={}", writeIndex, t); future.complete(buildResponse(request, DLedgerResponseCode.INCONSISTENT_STATE.getCode())); }}Copy the code

AppendAsFollower calls the DLedgerStore appendAsFollower method to add logs, which is the same as the appendAsLeader method in the log store, except that the slave node does not forward logs.

3.3.5 checkAbnormalFuture

This method, which is the focus of this section, is called when doWork’s slave server stores the maximum valid log number (ledgerEndIndex + 1). It is also called when trying to get the corresponding request from the pending requests. This is also common, such as when the master node does not PUSH the latest data to the slave node. Let’s look at the implementation details of this method in detail. EntryHandler#checkAbnormalFuture

if (DLedgerUtils.elapsed(lastCheckFastForwardTimeMs) < 1000) {
    return;
}
lastCheckFastForwardTimeMs  = System.currentTimeMillis();
if (writeRequestMap.isEmpty()) {
    return;
}
Copy the code

Step1: if the time since the last check is less than 1s, jump out; If there is no current backlog of append requests, the same is true, because it is equally clear that the master node has not yet pushed the log.

EntryHandler#checkAbnormalFuture

for (Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> pair : writeRequestMap.values()) {
    long index = pair.getKey().getEntry().getIndex();             / / @ 1
    //Fall behind
    if (index <= endIndex) {                                                   / / @ 2
        try {
            DLedgerEntry local = dLedgerStore.get(index);
            PreConditions.check(pair.getKey().getEntry().equals(local), DLedgerResponseCode.INCONSISTENT_STATE);
            pair.getValue().complete(buildResponse(pair.getKey(), DLedgerResponseCode.SUCCESS.getCode()));
            logger.warn("[PushFallBehind]The leader pushed an entry index={} smaller than current ledgerEndIndex={}, maybe the last ack is missed", index, endIndex);
        } catch (Throwable t) {
            logger.error("[PushFallBehind]The leader pushed an entry index={} smaller than current ledgerEndIndex={}, maybe the last ack is missed", index, endIndex, t);
            pair.getValue().complete(buildResponse(pair.getKey(), DLedgerResponseCode.INCONSISTENT_STATE.getCode()));
        }
        writeRequestMap.remove(index);
        continue;
    }
    //Just OK
    if (index ==  endIndex + 1) {    / / @ 3
        //The next entry is coming, just return
        return;
    }
    //Fast forward
    TimeoutFuture<PushEntryResponse> future  = (TimeoutFuture<PushEntryResponse>) pair.getValue();    / / @ 4
    if(! future.isTimeOut()) {continue;
    }
    if (index < minFastForwardIndex) {                                                                                                                / / @ 5minFastForwardIndex = index; }}Copy the code

Step2: iterate over the current additional log requests to be written (log replication requests pushed by the primary server), and find the indexes that need to be fast-forward. The key implementation points are as follows:

  • Code @1: First get the sequence number of the log to be written.
  • @2: If the log number to be written is less than the endIndex of the secondary node, and the log is indeed stored in the secondary node, success is returned and PushFallBehind is printed to continue monitoring the next log to be written.
  • Code @3: If index to be written is equal to endIndex + 1, the loop ends because the next log message is already in the queue to be written.
  • @4: If the index to be written is greater than endIndex + 1 and does not time out, the next log to be written is directly checked.
  • Code @5: If the index to be written is greater than endIndex + 1 and has timed out, the index is logged and stored using minFastForwardIndex.

EntryHandler#checkAbnormalFuture

if (minFastForwardIndex == Long.MAX_VALUE) {
    return;
}
Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> pair = writeRequestMap.get(minFastForwardIndex);
if (pair == null) {
    return;
}
Copy the code

Step3: If the log sequence number required for quick failure is not found or the request is not found in writeRequestMap, the detection is directly ended.

EntryHandler#checkAbnormalFuture

logger.warn("[PushFastForward] ledgerEndIndex={} entryIndex={}", endIndex, minFastForwardIndex);
pair.getValue().complete(buildResponse(pair.getKey(), DLedgerResponseCode.INCONSISTENT_STATE.getCode()));
Copy the code

Step4: Then report to the master node that data inconsistency has occurred between the slave node and the master node, and the slave node does not write the log with serial number minFastForwardIndex. If the master node receives such a response, it stops log forwarding and sends a COMPARE request to each slave node to make the data consistent.

Behavior at this point, has introduced the main server to send a request from the server, and from the service to respond, then take a look at, the server receives the response results after the treatment, we need to know the master node to all its spread from node logs, the master node need to receive more than half of the cluster nodes within a specified time, to think the log write success, So let’s look at the implementation.

4, QuorumAckChecker

Log replication voting machine, a log write request will not be submitted until a majority of the nodes in the cluster respond.

4.1 class diagram



Its core attributes are as follows:

  • Long lastPrintWatermarkTimeMs Timestamp of the last water line printed, in milliseconds.
  • Long lastCheckLeakTimeMs Timestamp of the last detected leak, in milliseconds.
  • Long lastQuorumIndex Log number of a voted arbitrator.

4.2 doWork,

QuorumAckChecker#doWork

if (DLedgerUtils.elapsed(lastPrintWatermarkTimeMs) > 3000) {    
    logger.info("[{}][{}] term={} ledgerBegin={} ledgerEnd={} committed={} watermarks={}",
            memberState.getSelfId(), memberState.getRole(), memberState.currTerm(), dLedgerStore.getLedgerBeginIndex(), dLedgerStore.getLedgerEndIndex(), dLedgerStore.getCommittedIndex(), JSON.toJSONString(peerWaterMarksByTerm));
    lastPrintWatermarkTimeMs = System.currentTimeMillis();
}
Copy the code

Step1: if it has been more than 3s since the last watermak was printed, print the current data log term, ledgerBegin, ledgerEnd, committed, peerWaterMarksByTerm.

QuorumAckChecker#doWork

if(! memberState.isLeader()) {/ / @ 2
    waitForRunning(1);
    return;
}
Copy the code

Step2: if the current node is not the primary node, return directly and do nothing.

QuorumAckChecker#doWork

if (pendingAppendResponsesByTerm.size() > 1) {   / / @ 1
    for (Long term : pendingAppendResponsesByTerm.keySet()) {
        if (term == currTerm) {
            continue;
        }
        for (Map.Entry<Long, TimeoutFuture<AppendEntryResponse>> futureEntry : pendingAppendResponsesByTerm.get(term).entrySet()) {
            AppendEntryResponse response = new AppendEntryResponse();
            response.setGroup(memberState.getGroup());
            response.setIndex(futureEntry.getKey());
            response.setCode(DLedgerResponseCode.TERM_CHANGED.getCode());
            response.setLeaderId(memberState.getLeaderId());
            logger.info("[TermChange] Will clear the pending response index={} for term changed from {} to {}", futureEntry.getKey(), term, currTerm); futureEntry.getValue().complete(response); } pendingAppendResponsesByTerm.remove(term); }}if (peerWaterMarksByTerm.size() > 1) {
    for (Long term : peerWaterMarksByTerm.keySet()) {
        if (term == currTerm) {
            continue;
        }
        logger.info("[TermChange] Will clear the watermarks for term changed from {} to {}", term, currTerm); peerWaterMarksByTerm.remove(term); }}Copy the code

Step3: cleaning pendingAppendResponsesByTerm, the vote in the peerWaterMarksByTerm rounds of data, to avoid some unnecessary memory usage.

Map<String, Long> peerWaterMarks = peerWaterMarksByTerm.get(currTerm);
long quorumIndex = -1;
for (Long index : peerWaterMarks.values()) {  / / @ 1
    int num = 0;
    for (Long another : peerWaterMarks.values()) {  / / @ 2
        if(another >= index) { num++; }}if (memberState.isQuorum(num) && index > quorumIndex) {  / / @ 3
        quorumIndex = index;
    }
}
dLedgerStore.updateCommittedIndex(currTerm, quorumIndex);  / / @ 4
Copy the code

Step4: arbitrate according to the progress feedback from each node to determine the submitted serial number. To further understand this code, the purpose of peerWaterMarks is to store the log sequence number that has been successfully appended to each slave node. For example, in a three-node DLedger cluster, the peerWaterMarks data store might look like this:

{
“dledger_group_01_0” : 100."dledger_group_01_1" : 101,}Copy the code

Dledger_group_01_0 indicates the ID of secondary node 1 and the serial number of the current replication is 100. Dledger_group_01_1 indicates the ID of node 2 and the serial number of the current replication is 101. Plus the master node, how do you determine the deliverable number?

  • Code @1: Firstly, the value set of peerWaterMarks is traversed, i.e. {100, 101} in the above example. Temporary variable index is used to represent the serial number of the log to be voted. If the replicated serial number of more than half of the nodes in the cluster exceeds this value, the log can be confirmed and submitted.
  • Code @2: Go through all the submitted serial numbers in peerWaterMarks and compare them with the current value. If the submitted serial number of the node is greater than or equal to the index of the log to be voted, num plus one means voting for it.
  • Code @3: Arbitrate index, if more than half and index is greater than quorumIndex, update quorumIndex to index. QuorumIndex Indicates the traversal number of the current maximum allowable log number.
  • @4: Update the committedIndex, so that DLedgerStore periodically writes committedIndex to checkpoint.
ConcurrentMap<Long, TimeoutFuture<AppendEntryResponse>> responses = pendingAppendResponsesByTerm.get(currTerm);
boolean needCheck = false;
int ackNum = 0;
if (quorumIndex >= 0) {
    for (Long i = quorumIndex; i >= 0; i--) {  / / @ 1
        try {
            CompletableFuture<AppendEntryResponse> future = responses.remove(i);   / / @ 2
            if (future == null) {                                                                                              / / @ 3needCheck = lastQuorumIndex ! = -1&& lastQuorumIndex ! = quorumIndex && i ! = lastQuorumIndex;break;
            } else if(! future.isDone()) {/ / @ 4
                AppendEntryResponse response = new AppendEntryResponse();
                response.setGroup(memberState.getGroup());
                response.setTerm(currTerm);
                response.setIndex(i);
                response.setLeaderId(memberState.getSelfId());
                response.setPos(((AppendFuture) future).getPos());
                future.complete(response);
            }
            ackNum++;                                                                                                      / / @ 5
        } catch (Throwable t) {
            logger.error("Error in ack to index={} term={}", i, currTerm, t); }}}Copy the code

Step5: process the pending request before quorumIndex, and send the response to the client. The implementation steps are as follows:

  • @1: Start from quorumIndex, no item is processed, the serial number is decreased by one, until greater than 0 or active exit, please see the following exit logic.

  • The pending request to remove this log entry in @2: Responses.

  • Code @3: If no pending request is found, it indicates that all pending requests have been processed and are ready to exit. Before exiting, set needCheck based on the following criteria (all three criteria must be met) :

    • The log number of the last arbitration is different from -1
    • The last log does not equal the log number of the new arbitration
    • The log number of the last arbitration is different from that of the last arbitration. Normally, condition 1 and condition 2 are usually true, but this one will most likely return false.
  • Code @4: Returns the result to the client.

  • Code @5: ackNum, indicating the number of confirmations.

if (ackNum == 0) {
    for (long i = quorumIndex + 1; i < Integer.MAX_VALUE; i++) {
        TimeoutFuture<AppendEntryResponse> future = responses.get(i);
        if (future == null) {
            break;
        } else if (future.isTimeOut()) {
            AppendEntryResponse response = new AppendEntryResponse();
            response.setGroup(memberState.getGroup());
            response.setCode(DLedgerResponseCode.WAIT_QUORUM_ACK_TIMEOUT.getCode());
            response.setTerm(currTerm);
            response.setIndex(i);
            response.setLeaderId(memberState.getSelfId());
            future.complete(response);
        } else {
            break;
        }
    }
    waitForRunning(1);
}
Copy the code

Step6: if the number of confirmation is 0, try to determine whether the request that exceeds the arbitration serial number has timed out. If so, return the result of timeout response.

if (DLedgerUtils.elapsed(lastCheckLeakTimeMs) > 1000 || needCheck) {
    updatePeerWaterMark(currTerm, memberState.getSelfId(), dLedgerStore.getLedgerEndIndex());
    for (Map.Entry<Long, TimeoutFuture<AppendEntryResponse>> futureEntry : responses.entrySet()) {
        if (futureEntry.getKey() < quorumIndex) {
            AppendEntryResponse response = new AppendEntryResponse();
            response.setGroup(memberState.getGroup());
            response.setTerm(currTerm);
            response.setIndex(futureEntry.getKey());
            response.setLeaderId(memberState.getSelfId());
            response.setPos(((AppendFuture) futureEntry.getValue()).getPos());
            futureEntry.getValue().complete(response);
            responses.remove(futureEntry.getKey());
        }
    }
    lastCheckLeakTimeMs = System.currentTimeMillis();
}
Copy the code

Step7: check whether leakage is sent. The leak is determined by removing pending requests if their log number is less than the submitted number.

Step8: Once the log arbitration ends, finally update lastQuorumIndex as the new submission value of this arbitration.

So much for DLedger’s log replication section. This article is a long one. Please give a thumbs up to all the readers here. Thank you.


See the article, I am Wei Ge, keen on systematic analysis of JAVA mainstream middleware, pay attention to the public account “middleware interest circle”, reply column can get systematic column navigation, reply information can get the author’s learning mind map. \