The opening

In fact, I was going to start reading linear consistency right after the election, but I felt like it was a little bit more difficult to read without a head, without a tail, so here’s RheaKV, and finally how SOFAJRaft does it. A simple example of linear consistency is that if a value is written at T1, it must be read after T1, but not before T1.

Some of these references SOFAJRaft document: SOFAJRaft linear consistent read implementation analysis | SOFAJRaft implementation principle SOFAJRaft principle – SOFAJRaft – RheaKV is how to use the Raft

RheaKV reads data

The data access point for RheaKV is the bGet of DefaultRheaKVStore.

DefaultRheaKVStore#bGet

public byte[] bGet(final String key) {
    return FutureHelper.get(get(key), this.futureTimeoutMillis);
}
Copy the code

The bGet method is called all the way to a get method for DefaultRheaKVStore: DefaultRheaKVStore#get

private CompletableFuture<byte[]> get(final byte[] key, final boolean readOnlySafe,
                                      final CompletableFuture<byte[]> future, final boolean tryBatching) {
    // Verify the started state
    checkState();
    Requires.requireNonNull(key, "key");
    if (tryBatching) {
        final GetBatching getBatching = readOnlySafe ? this.getBatchingOnlySafe : this.getBatching;
        if(getBatching ! =null && getBatching.apply(key, future)) {
            return future;
        }
    }
    internalGet(key, readOnlySafe, future, this.failoverRetries, null.this.onlyLeaderRead);
    return future;
}
Copy the code

ReadOnlySafe indicates whether thread consistency reading is enabled. Since we are calling get, both readOnlySafe and tryBatching return true. So it calls the apply method of getBatchingOnlySafe, passing in the key and the Future. GetBatchingOnlySafe is initialized when we initialize DefaultRheaKVStore: DefaultRheaKVStore#init

.this.getBatchingOnlySafe = new GetBatching(KeyEvent::new."get_batching_only_safe".new GetBatchingHandler("get_only_safe".true)); .Copy the code

When we initialize getBatchingOnlySafe, the handler we pass in is GetBatchingHandler.

Then let’s go back to getBatchingOnlySafe#apply and see what this method does:

public boolean apply(final byte[] message, final CompletableFuture<byte[]> future) {
    //GetBatchingHandler
    return this.ringBuffer.tryPublishEvent((event, sequence) -> {
        event.reset();
        event.key = message;
        event.future = future;
    });
}
Copy the code

The Apply method sends an event to Disruptor for asynchronous processing and encapsulates our key in the event’s key. The handler for getBatchingOnlySafe is GetBatchingHandler.

Batch data acquisition

GetBatchingHandler#onEvent

public void onEvent(final KeyEvent event, final long sequence, final boolean endOfBatch) throws Exception {
    this.events.add(event);
    this.cachedBytes += event.key.length;
    final int size = this.events.size();
    If MaxReadBytes is not reached and is not the last event, return
    if(! endOfBatch && size < batchingOpts.getBatchSize() &&this.cachedBytes < batchingOpts.getMaxReadBytes()) {
        return;
    }

    if (size == 1) {
        reset();
        try {
            // If it is just a GET request, there is no need for batch processing
            get(event.key, this.readOnlySafe, event.future, false);
        } catch (finalThrowable t) { exceptionally(t, event.future); }}else {
        // Initialize a collection of just the right size
        final List<byte[]> keys = Lists.newArrayListWithCapacity(size);
        final CompletableFuture<byte[]>[] futures = new CompletableFuture[size];
        for (int i = 0; i < size; i++) {
            final KeyEvent e = this.events.get(i);
            keys.add(e.key);
            futures[i] = e.future;
        }
        // After iterating through events data to entries, reset
        reset();
        try {
            multiGet(keys, this.readOnlySafe).whenComplete((result, throwable) -> {
                // Asynchronous callback processes data
                if (throwable == null) {
                    for (int i = 0; i < futures.length; i++) {
                        final ByteArray realKey = ByteArray.wrap(keys.get(i));
                        futures[i].complete(result.get(realKey));
                    }
                    return;
                }
                exceptionally(throwable, futures);
            });
        } catch (finalThrowable t) { exceptionally(t, futures); }}}}Copy the code

The onEvent method first verifies that the number of current events has reached the threshold and that the current event is the last Disruptor. Then different implementations will be implemented according to the number of events in the different set. An optimization is made here. If there is only one data, batch processing will not be performed; Finally, all keys are put into the keys collection and multiGet is called for batch processing.

The multiGet method calls internalMultiGet to return a Future, which returns the result asynchronously. DefaultRheaKVStore#internalMultiGet

private FutureGroup<Map<ByteArray, byte[]>> internalMultiGet(final List<byte[]> keys, final boolean readOnlySafe,
                                                             final int retriesLeft, final Throwable lastCause) {
    // Different keys are stored in different regions. Therefore, one region has multiple keys encapsulated in a map
    final Map<Region, List<byte[]>> regionMap = this.pdClient
            .findRegionsByKeys(keys, ApiExceptionHelper.isInvalidEpoch(lastCause));
    / / the return value
    final List<CompletableFuture<Map<ByteArray, byte[]>>> futures =
            Lists.newArrayListWithCapacity(regionMap.size());
    //lastCause is passed as null
    final Errors lastError = lastCause == null ? null : Errors.forException(lastCause);

    for (final Map.Entry<Region, List<byte[]>> entry : regionMap.entrySet()) {
        final Region region = entry.getKey();
        final List<byte[]> subKeys = entry.getValue();
        // The number of retries is reduced by 1 to set a retry function
        final RetryCallable<Map<ByteArray, byte[]>> retryCallable = retryCause -> internalMultiGet(subKeys,
                readOnlySafe, retriesLeft - 1, retryCause);
        final MapFailoverFuture<ByteArray, byte[]> future = new MapFailoverFuture<>(retriesLeft, retryCallable);
        // Send a MultiGetRequest request to retrieve data
        internalRegionMultiGet(region, subKeys, readOnlySafe, future, retriesLeft, lastError, this.onlyLeaderRead);
        futures.add(future);
    }
    return new FutureGroup<>(futures);
}
Copy the code

In internalMultiGet, a region is assembled based on a key. Different keys correspond to different regions. Data is stored in a region, so data needs to be obtained from different regions. The regionMap is then traversed, and the data corresponding to each region is called as a batch to the internalRegionMultiGet method to obtain the data according to different conditions.

DefaultRheaKVStore#internalRegionMultiGet

private void internalRegionMultiGet(final Region region, final List<byte[]> subKeys, final boolean readOnlySafe,
                                    final CompletableFuture<Map<ByteArray, byte[]>> future, final int retriesLeft,
                                    final Errors lastCause, final boolean requireLeader) {
    // Since the current is client, this will be null
    final RegionEngine regionEngine = getRegionEngine(region.getId(), requireLeader);
    // require leader on retry
    // Set the retry function
    final RetryRunner retryRunner = retryCause -> internalRegionMultiGet(region, subKeys, readOnlySafe, future,
            retriesLeft - 1, retryCause, true);
    final FailoverClosure<Map<ByteArray, byte[]>> closure = new FailoverClosureImpl<>(future,
            false, retriesLeft, retryRunner);
    if(regionEngine ! =null) {
        if (ensureOnValidEpoch(region, regionEngine, closure)) {
            // If it is not null, the rawKVStore is obtained and the data is obtained from it
            final RawKVStore rawKVStore = getRawKVStore(regionEngine);
            if (this.kvDispatcher == null) {
                rawKVStore.multiGet(subKeys, readOnlySafe, closure);
            } else {
                // If kvDispatcher is not null, then it will be executed asynchronously in kvDispatcher
                this.kvDispatcher.execute(() -> rawKVStore.multiGet(subKeys, readOnlySafe, closure)); }}}else {
        final MultiGetRequest request = new MultiGetRequest();
        request.setKeys(subKeys);
        request.setReadOnlySafe(readOnlySafe);
        request.setRegionId(region.getId());
        request.setRegionEpoch(region.getRegionEpoch());
        // Invoke the RPC request
        this.rheaKVRpcService.callAsyncWithRpc(request, closure, lastCause, requireLeader); }}Copy the code

InternalRegionMultiGet is invoked by the client, so regionEngine is not set. A MultiGetRequest request is directly sent to the leader node of the current region of the server.

Since these methods are basically the same as PUT, we’ve already covered 5. SOFAJRaft source Analysis – How to store data in RheaKV. I did, so I’m not going to repeat it.

The server handles the MultiGetRequest request

The MultiGetRequest request is processed by the KVCommandProcessor, which determines the processing method based on the magic method return value of the request. We’re going to call the handleMultiGetRequest method of DefaultRegionKVService to handle the request.

public void handleMultiGetRequest(final MultiGetRequest request,
                                  finalRequestProcessClosure<BaseRequest, BaseResponse<? >> closure) {
    final MultiGetResponse response = new MultiGetResponse();
    response.setRegionId(getRegionId());
    response.setRegionEpoch(getRegionEpoch());
    try {
        KVParameterRequires.requireSameEpoch(request, getRegionEpoch());
        final List<byte[]> keys = KVParameterRequires.requireNonEmpty(request.getKeys(), "multiGet.keys");
        // Call the multiGet method of MetricsRawKVStore
        this.rawKVStore.multiGet(keys, request.isReadOnlySafe(), new BaseKVStoreClosure() {

            @SuppressWarnings("unchecked")
            @Override
            public void run(final Status status) {
                if (status.isOk()) {
                    response.setValue((Map<ByteArray, byte[]>) getData());
                } else{ setFailure(request, response, status, getError()); } closure.sendResponse(response); }}); }catch (final Throwable t) {
        LOG.error("Failed to handle: {}, {}.", request, StackTraceUtil.stackTrace(t)); response.setError(Errors.forException(t)); closure.sendResponse(response); }}Copy the code

The handleMultiGetRequest method calls the multiGet method of MetricsRawKVStore to get data in batches.

MetricsRawKVStore#multiGet

public void multiGet(final List<byte[]> keys, final boolean readOnlySafe, final KVStoreClosure closure) {
    Instantiate the MetricsKVClosureAdapter object
    final KVStoreClosure c = metricsAdapter(closure, MULTI_GET, keys.size(), 0);
    // Call the multiGet method of RaftRawKVStore
    this.rawKVStore.multiGet(keys, readOnlySafe, c);
}
Copy the code

The multiGet method passes in an instance of MetricsKVClosureAdapter through which the asynchronous callback to Response is implemented. Then call the multiGet method of RaftRawKVStore.

RaftRawKVStore#multiGet

public void multiGet(final List<byte[]> keys, final boolean readOnlySafe, final KVStoreClosure closure) {
    if(! readOnlySafe) {this.kvStore.multiGet(keys, false, closure);
        return;
    }
    // KV storage implements linear consistent read
    // Call the readIndex method and wait for the callback to execute
    this.node.readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {

        @Override
        public void run(final Status status, final long index, final byte[] reqCtx) {
            // If the status returns success,
            if (status.isOk()) {
                RaftRawKVStore.this.kvStore.multiGet(keys, true, closure);
                return;
            }
            // Failed to read readIndex. An attempt was made to apply a key read operation to the state machine KVStoreStateMachine of the Leader node
            RaftRawKVStore.this.readIndexExecutor.execute(() -> {
                if (isLeader()) {
                    LOG.warn("Fail to [multiGet] with 'ReadIndex': {}, try to applying to the state machine.",
                            status);
                    // If 'read index' read fails, try to applying to the state machine at the leader node
                    applyOperation(KVOperation.createMultiGet(keys), closure);
                } else {
                    LOG.warn("Fail to [multiGet] with 'ReadIndex': {}.", status);
                    // Client will retry to leader node
                    new KVClosureAdapter(closure, null).run(status); }}); }}); }Copy the code

MultiGet calls node’s readIndex method for a consistent read and sets the callback. If the result is successful, RocksRawKVStore is called to read the data. If the return is not successful then apply the task to the Leader node’s state machine KVStoreStateMachine.

Linear consistency read readIndex

A simple example of linear consistent reading is that if we write a value at time T1, we must read the value after T1, but not before T1 (think of the Volatile keyword in Java, That is, linear consistent reading implements Java volatile semantics in distributed systems. In short, the Java volatile semantic effect is required in a distributed environment. When a Client requests a write to the cluster and receives a successful response, the result of that write is visible to all subsequent read requests. The difference from Volatile is that volatile implements visibility between threads, whereas SOFAJRaft implements visibility between servers.

SOFAJRaft provides linear consistent reading based on Raft protocol ReadIndex implementation; Node#readIndex(byte [] requestContext, ReadIndexClosure Done) invokes a linear and consistent read request. When a safe read is made, the Closure will be called, normally reading data from the state machine and returning it to the client.

Node#readIndex

public void readIndex(final byte[] requestContext, final ReadIndexClosure done) {
    if (this.shutdownLatch ! =null) {
        // Execute the callback asynchronously
        Utils.runClosureInThread(done, new Status(RaftError.ENODESHUTDOWN, "Node is shutting down."));
        throw new IllegalStateException("Node is shutting down");
    }
    Requires.requireNonNull(done, "Null closure");
    //EMPTY_BYTES
    this.readOnlyService.addRequest(requestContext, done);
}
Copy the code

ReadIndex calls ReadOnlyServiceImpl#addRequest to pass in requestContext and the callback method done. RequestContext is passed in BytesUtil.EMPTY_BYTES

ReadOnlyServiceImpl#addRequest

public void addRequest(final byte[] reqCtx, final ReadIndexClosure closure) {
    if (this.shutdownLatch ! =null) {
        Utils.runClosureInThread(closure, new Status(RaftError.EHOSTDOWN, "Was stopped"));
        throw new IllegalStateException("Service already shutdown.");
    }
    try {
        EventTranslator<ReadIndexEvent> translator = (event, sequence) -> {
            event.done = closure;
            //EMPTY_BYTES
            event.requestContext = new Bytes(reqCtx);
            event.startTime = Utils.monotonicMs();
        };
        int retryTimes = 0;
        while (true) {
            //ReadIndexEventHandler
            if (this.readIndexQueue.tryPublishEvent(translator)) {
                break;
            } else {
                retryTimes++;
                if (retryTimes > MAX_ADD_REQUEST_RETRY_TIMES) {
                    Utils.runClosureInThread(closure,
                        new Status(RaftError.EBUSY, "Node is busy, has too many read-only requests."));
                    this.nodeMetrics.recordTimes("read-index-overload-times".1);
                    LOG.warn("Node {} ReadOnlyServiceImpl readIndexQueue is overload.".this.node.getNodeId());
                    return; } ThreadHelper.onSpinWait(); }}}catch (final Exception e) {
        Utils.runClosureInThread(closure, new Status(RaftError.EPERM, "Node is down.")); }}Copy the code

The addRequest method wraps the reqCtx and Closure into a single time and passes it to the readIndexQueue queue. The ReadIndexEventHandler handler will process the event once it has been successfully published, and retry the event once it has failed, up to three times.

ReadIndexEventHandler

private class ReadIndexEventHandler implements EventHandler<ReadIndexEvent> {
    // task list for batch
    private final List<ReadIndexEvent> events = new ArrayList<>(
                                                  ReadOnlyServiceImpl.this.raftOptions.getApplyBatch());

    @Override
    public void onEvent(final ReadIndexEvent newEvent, final long sequence, final boolean endOfBatch)
                                                                                                     throws Exception {
        if(newEvent.shutdownLatch ! =null) {
            executeReadIndexEvents(this.events);
            this.events.clear();
            newEvent.shutdownLatch.countDown();
            return;
        }

        this.events.add(newEvent);
        // Batch execution
        if (this.events.size() >= ReadOnlyServiceImpl.this.raftOptions.getApplyBatch() || endOfBatch) {
            executeReadIndexEvents(this.events);
            this.events.clear(); }}}Copy the code

ReadIndexEventHandler is an internal class within ReadOnlyServiceImpl that has a global set of Events for batchingevents, The executeReadIndexEvents method of ReadOnlyServiceImpl is called to batch the event if the Disruptor queue has reached 32 or is the last one in the Disruptor queue.

ReadOnlyServiceImpl#executeReadIndexEvents

private void executeReadIndexEvents(final List<ReadIndexEvent> events) {
    if (events.isEmpty()) {
        return;
    }
    // Initialize ReadIndexRequest
    final ReadIndexRequest.Builder rb = ReadIndexRequest.newBuilder() //
        .setGroupId(this.node.getGroupId()) //
        .setServerId(this.node.getServerId().toString());

    final List<ReadIndexState> states = new ArrayList<>(events.size());

    for (final ReadIndexEvent event : events) {
        rb.addEntries(ZeroByteStringHelper.wrap(event.requestContext.get()));
        states.add(new ReadIndexState(event.requestContext, event.done, event.startTime));
    }
    final ReadIndexRequest request = rb.build();

    this.node.handleReadIndexRequest(request, new ReadIndexResponseClosure(states, request));
}
Copy the code

ExecuteReadIndexEvents encapsulates the ReadIndexRequest request and the ReadIndexState collection into ReadIndexResponseClosure for subsequent operations

NodeImpl#handleReadIndexRequest

public void handleReadIndexRequest(final ReadIndexRequest request, final RpcResponseClosure<ReadIndexResponse> done) {
    final long startMs = Utils.monotonicMs();
    this.readLock.lock();
    try {
        switch (this.state) {
            case STATE_LEADER:
                readLeader(request, ReadIndexResponse.newBuilder(), done);
                break;
            case STATE_FOLLOWER:
                readFollower(request, done);
                break;
            case STATE_TRANSFERRING:
                done.run(new Status(RaftError.EBUSY, "Is transferring leadership."));
                break;
            default:
                done.run(new Status(RaftError.EPERM, "Invalid state for readIndex: %s.".this.state));
                break; }}finally {
        this.readLock.unlock();
        this.metrics.recordLatency("handle-read-index", Utils.monotonicMs() - startMs);
        this.metrics.recordSize("handle-read-index-entries", request.getEntriesCount()); }}Copy the code

Because linear consistent reads are initiated by nodes in any cluster, they do not need to be forced to be placed on the Leader node and are allowed to be executed on the Follower node, thus greatly reducing the read pressure on the Leader. When performing a consistency read on the Follower node, the Follower node actually calls RpcService#readIndex(LeaderId.geTendPoint (), newRequest, -1, Closure) method sends a ReadIndex request to the Leader node for a consistent read. Therefore, I mainly introduce the consistent read of the Leader.

Continue down to call NodeImpl’s readLeader method NodeImpl#readLeader

private void readLeader(final ReadIndexRequest request, final ReadIndexResponse.Builder respBuilder,
                        final RpcResponseClosure<ReadIndexResponse> closure) {
    //1. Obtain the majority of votes in the cluster node
    final int quorum = getQuorum();
    if (quorum <= 1) {
        // Only one peer, fast path.
        // If there is only one node in the cluster, call the callback directly and return success
        respBuilder.setSuccess(true) //
                .setIndex(this.ballotBox.getLastCommittedIndex());
        closure.setResponse(respBuilder.build());
        closure.run(Status.OK());
        return;
    }

    final long lastCommittedIndex = this.ballotBox.getLastCommittedIndex();
    //2. Terms of office must be equal
    // the LogManager checks whether the term is equal to the current term based on the lastCommittedIndex of the BallotBox BallotBox
    // If it is not equal to the current term, the Leader node has not committed any logs during its term and must reject the read-only request.
    if (this.logManager.getTerm(lastCommittedIndex) ! =this.currTerm) {
        // Reject read only request when this leader has not committed any log entry at its term
        closure
                .run(new Status(
                        RaftError.EAGAIN,
                        "ReadIndex request rejected because leader has not committed any log entry at its term, " +
                         "logIndex=%d, currTerm=%d.",
                        lastCommittedIndex, this.currTerm));
        return;
    }
    respBuilder.setIndex(lastCommittedIndex);

    if(request.getPeerId() ! =null) {
        // request from follower, check if the follower is in current conf.
        final PeerId peer = new PeerId();
        peer.parse(request.getServerId());
        //3. The request from followers needs to check whether the followers are currently configured
        if (!this.conf.contains(peer)) {
            closure
                    .run(new Status(RaftError.EPERM, "Peer %s is not in current configuration: {}.", peer,
                     this.conf));
            return;
        }
    }

    ReadOnlyOption readOnlyOpt = this.raftOptions.getReadOnlyOptions();
    //4. If ReadOnlyLeaseBased is used, check whether the leader is in the lease validity period
    if(readOnlyOpt == ReadOnlyOption.ReadOnlyLeaseBased && ! isLeaderLeaseValid()) {// If leader lease timeout, we must change option to ReadOnlySafe
        readOnlyOpt = ReadOnlyOption.ReadOnlySafe;
    }

    switch (readOnlyOpt) {
        / / 5
        case ReadOnlySafe:
            final List<PeerId> peers = this.conf.getConf().getPeers(); Requires.requireTrue(peers ! =null && !peers.isEmpty(), "Empty peers");
            // Set the heartbeat response callback function
            final ReadIndexHeartbeatResponseClosure heartbeatDone = new ReadIndexHeartbeatResponseClosure(closure,
                    respBuilder, quorum, peers.size());
            // Send heartbeat requests to followers
            // Initiate a Heartbeat to the Followers node if more than half of them return the corresponding
            // The Heartbeat responds, and the Leader can confirm that it is still the Leader
            for (final PeerId peer : peers) {
                if (peer.equals(this.serverId)) {
                    continue;
                }
                this.replicatorGroup.sendHeartbeat(peer, heartbeatDone);
            }
            break;
        //6. Ensure that the Leader does not change because elections do not occur during the lease term
        // Return the result of the callback directly
        case ReadOnlyLeaseBased:
            // Responses to followers and local node.
            respBuilder.setSuccess(true);
            closure.setResponse(respBuilder.build());
            closure.run(Status.OK());
            break; }}Copy the code
  1. If there is only one node in the current cluster, return success and call the callback method
  2. The response constructor sets its index to the lastCommittedIndex of the BallotBox BallotBox, verifying that the number of Raft cluster nodes and the tenure of lastCommittedIndex are as expected
  3. Requests from followers need to check whether the Follower is currently configured, if not in the current configuration directly call the callback method Settings exception
  4. Gets the ReadIndex request level ReadOnlyOption configuration. The ReadOnlyOption parameter defaults to ReadOnlySafe. If ReadOnlyLeaseBased is set, isLeaderLeaseValid is called to check whether the leader is in the lease life
  5. Call Replicator#sendHeartbeat(rid, closure) to ReadOnlySafe and send the Heartbeat request to the Followers node. Send a heartbeat successfully executed ReadIndexHeartbeatResponseClosure heart rate response callback; The ReadIndex Heartbeat response callback checks whether more than half of the nodes, including the Leader node, vote for it, and more than half of the nodes return a successful Heartbeat response to the client. That is, if applyIndex exceeds ReadIndex, the Log corresponding to ReadIndex has been synchronized to provide Linearizable Read
  6. Set this parameter to ReadOnlyLeaseBased, because the current Leader is considered to be the only valid Leader in the Raft Group during the lifetime of the Leader lease, the ReadIndex step of sending the Heartbeat to confirm the identity is ignored. The Follower node and local node successfully respond to the Read request. The Leader node continues to wait for the state machine to execute until applyIndex exceeds ReadIndex to safely provide Linearizable Read

Either ReadOnlySafe or ReadOnlyLeaseBased calls the Run method of ReadIndexResponseClosure when a successful response is sent.

ReadIndexResponseClosure#run

public void run(final Status status) {
    //fail
    // The incoming state is not OK, the response failed
    if(! status.isOk()) { notifyFail(status);return;
    }
    final ReadIndexResponse readIndexResponse = getResponse();
    //Fail
    // Response The response failed
    if(! readIndexResponse.getSuccess()) { notifyFail(new Status(-1."Fail to run ReadIndex task, maybe the leader stepped down."));
        return;
    }
    // Success
    // The consistency read succeeded
    final ReadIndexStatus readIndexStatus = new ReadIndexStatus(this.states, this.request,
        readIndexResponse.getIndex());
    for (final ReadIndexState state : this.states) {
        // Records current commit log index.
        // Set the index of the current submission
        state.setIndex(readIndexResponse.getIndex());
    }

    boolean doUnlock = true;
    ReadOnlyServiceImpl.this.lock.lock();
    try {
        // Check whether applyIndex exceeds ReadIndex
        if (readIndexStatus.isApplied(ReadOnlyServiceImpl.this.fsmCaller.getLastAppliedIndex())) {
            // Already applied, notify readIndex request.
            ReadOnlyServiceImpl.this.lock.unlock();
            doUnlock = false;
            // The Log that has been synchronized to ReadIndex can provide Linearizable Read
            notifySuccess(readIndexStatus);
        } else {
            // Not applied, add it to pending-notify cache.
            ReadOnlyServiceImpl.this.pendingNotifyStatus
                .computeIfAbsent(readIndexStatus.getIndex(), k -> new ArrayList<>(10)) //.add(readIndexStatus); }}finally {
        if (doUnlock) {
            ReadOnlyServiceImpl.this.lock.unlock(); }}}Copy the code

The Run method first checks to see if the response needs to fail. If the response succeeds, all encapsulated ReadIndexState is updated to index and applyIndex exceeds ReadIndex. If the value exceeds ReadIndex, all logs that have been copied to the majority (which can be regarded as write operations) are considered safe, and the data represented in the Log is visible to the Client.

ReadOnlyServiceImpl#notifySuccess

private void notifySuccess(final ReadIndexStatus status) {
    final long nowMs = Utils.monotonicMs();
    final List<ReadIndexState> states = status.getStates();
    final int taskCount = states.size();
    for (int i = 0; i < taskCount; i++) {
        final ReadIndexState task = states.get(i);
        final ReadIndexClosure done = task.getDone(); // stack copy
        if(done ! =null) {
            this.nodeMetrics.recordLatency("read-index", nowMs - task.getStartTimeMs()); done.setResult(task.getIndex(), task.getRequestContext().get()); done.run(Status.OK()); }}}Copy the code

If the response is successful, the notifySuccess method is called, which iterates through the ReadIndexState set encapsulated in Status and calls the run method.

This run method will call RaftRawKVStore#multiGet in the run method that we set up in multiGet

public void multiGet(final List<byte[]> keys, final boolean readOnlySafe, final KVStoreClosure closure) {
    if(! readOnlySafe) {this.kvStore.multiGet(keys, false, closure);
        return;
    }
    // KV storage implements linear consistent read
    // Call the readIndex method and wait for the callback to execute
    this.node.readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {

        @Override
        public void run(final Status status, final long index, final byte[] reqCtx) {
            // If the status returns success,
            if (status.isOk()) {
                RaftRawKVStore.this.kvStore.multiGet(keys, true, closure);
                return;
            }
            // Failed to read readIndex. An attempt was made to apply a key read operation to the state machine KVStoreStateMachine of the Leader node
            RaftRawKVStore.this.readIndexExecutor.execute(() -> {
                if (isLeader()) {
                    LOG.warn("Fail to [multiGet] with 'ReadIndex': {}, try to applying to the state machine.",
                            status);
                    // If 'read index' read fails, try to applying to the state machine at the leader node
                    applyOperation(KVOperation.createMultiGet(keys), closure);
                } else {
                    LOG.warn("Fail to [multiGet] with 'ReadIndex': {}.", status);
                    // Client will retry to leader node
                    new KVClosureAdapter(closure, null).run(status); }}); }});Copy the code

The run method calls the multiGet of RaftRawKVStore to get the data directly from RocksDB.

conclusion

In this article, we start with the client get method for RheaKVStore. The RheaKVStore server uses JRaft to implement linear consistent reads. From this example, you should have a good understanding of linear consistent reads.