The opening
In fact, I was going to start reading linear consistency right after the election, but I felt like it was a little bit more difficult to read without a head, without a tail, so here’s RheaKV, and finally how SOFAJRaft does it. A simple example of linear consistency is that if a value is written at T1, it must be read after T1, but not before T1.
Some of these references SOFAJRaft document: SOFAJRaft linear consistent read implementation analysis | SOFAJRaft implementation principle SOFAJRaft principle – SOFAJRaft – RheaKV is how to use the Raft
RheaKV reads data
The data access point for RheaKV is the bGet of DefaultRheaKVStore.
DefaultRheaKVStore#bGet
public byte[] bGet(final String key) {
return FutureHelper.get(get(key), this.futureTimeoutMillis);
}
Copy the code
The bGet method is called all the way to a get method for DefaultRheaKVStore: DefaultRheaKVStore#get
private CompletableFuture<byte[]> get(final byte[] key, final boolean readOnlySafe,
final CompletableFuture<byte[]> future, final boolean tryBatching) {
// Verify the started state
checkState();
Requires.requireNonNull(key, "key");
if (tryBatching) {
final GetBatching getBatching = readOnlySafe ? this.getBatchingOnlySafe : this.getBatching;
if(getBatching ! =null && getBatching.apply(key, future)) {
return future;
}
}
internalGet(key, readOnlySafe, future, this.failoverRetries, null.this.onlyLeaderRead);
return future;
}
Copy the code
ReadOnlySafe indicates whether thread consistency reading is enabled. Since we are calling get, both readOnlySafe and tryBatching return true. So it calls the apply method of getBatchingOnlySafe, passing in the key and the Future. GetBatchingOnlySafe is initialized when we initialize DefaultRheaKVStore: DefaultRheaKVStore#init
.this.getBatchingOnlySafe = new GetBatching(KeyEvent::new."get_batching_only_safe".new GetBatchingHandler("get_only_safe".true)); .Copy the code
When we initialize getBatchingOnlySafe, the handler we pass in is GetBatchingHandler.
Then let’s go back to getBatchingOnlySafe#apply and see what this method does:
public boolean apply(final byte[] message, final CompletableFuture<byte[]> future) {
//GetBatchingHandler
return this.ringBuffer.tryPublishEvent((event, sequence) -> {
event.reset();
event.key = message;
event.future = future;
});
}
Copy the code
The Apply method sends an event to Disruptor for asynchronous processing and encapsulates our key in the event’s key. The handler for getBatchingOnlySafe is GetBatchingHandler.
Batch data acquisition
GetBatchingHandler#onEvent
public void onEvent(final KeyEvent event, final long sequence, final boolean endOfBatch) throws Exception {
this.events.add(event);
this.cachedBytes += event.key.length;
final int size = this.events.size();
If MaxReadBytes is not reached and is not the last event, return
if(! endOfBatch && size < batchingOpts.getBatchSize() &&this.cachedBytes < batchingOpts.getMaxReadBytes()) {
return;
}
if (size == 1) {
reset();
try {
// If it is just a GET request, there is no need for batch processing
get(event.key, this.readOnlySafe, event.future, false);
} catch (finalThrowable t) { exceptionally(t, event.future); }}else {
// Initialize a collection of just the right size
final List<byte[]> keys = Lists.newArrayListWithCapacity(size);
final CompletableFuture<byte[]>[] futures = new CompletableFuture[size];
for (int i = 0; i < size; i++) {
final KeyEvent e = this.events.get(i);
keys.add(e.key);
futures[i] = e.future;
}
// After iterating through events data to entries, reset
reset();
try {
multiGet(keys, this.readOnlySafe).whenComplete((result, throwable) -> {
// Asynchronous callback processes data
if (throwable == null) {
for (int i = 0; i < futures.length; i++) {
final ByteArray realKey = ByteArray.wrap(keys.get(i));
futures[i].complete(result.get(realKey));
}
return;
}
exceptionally(throwable, futures);
});
} catch (finalThrowable t) { exceptionally(t, futures); }}}}Copy the code
The onEvent method first verifies that the number of current events has reached the threshold and that the current event is the last Disruptor. Then different implementations will be implemented according to the number of events in the different set. An optimization is made here. If there is only one data, batch processing will not be performed; Finally, all keys are put into the keys collection and multiGet is called for batch processing.
The multiGet method calls internalMultiGet to return a Future, which returns the result asynchronously. DefaultRheaKVStore#internalMultiGet
private FutureGroup<Map<ByteArray, byte[]>> internalMultiGet(final List<byte[]> keys, final boolean readOnlySafe,
final int retriesLeft, final Throwable lastCause) {
// Different keys are stored in different regions. Therefore, one region has multiple keys encapsulated in a map
final Map<Region, List<byte[]>> regionMap = this.pdClient
.findRegionsByKeys(keys, ApiExceptionHelper.isInvalidEpoch(lastCause));
/ / the return value
final List<CompletableFuture<Map<ByteArray, byte[]>>> futures =
Lists.newArrayListWithCapacity(regionMap.size());
//lastCause is passed as null
final Errors lastError = lastCause == null ? null : Errors.forException(lastCause);
for (final Map.Entry<Region, List<byte[]>> entry : regionMap.entrySet()) {
final Region region = entry.getKey();
final List<byte[]> subKeys = entry.getValue();
// The number of retries is reduced by 1 to set a retry function
final RetryCallable<Map<ByteArray, byte[]>> retryCallable = retryCause -> internalMultiGet(subKeys,
readOnlySafe, retriesLeft - 1, retryCause);
final MapFailoverFuture<ByteArray, byte[]> future = new MapFailoverFuture<>(retriesLeft, retryCallable);
// Send a MultiGetRequest request to retrieve data
internalRegionMultiGet(region, subKeys, readOnlySafe, future, retriesLeft, lastError, this.onlyLeaderRead);
futures.add(future);
}
return new FutureGroup<>(futures);
}
Copy the code
In internalMultiGet, a region is assembled based on a key. Different keys correspond to different regions. Data is stored in a region, so data needs to be obtained from different regions. The regionMap is then traversed, and the data corresponding to each region is called as a batch to the internalRegionMultiGet method to obtain the data according to different conditions.
DefaultRheaKVStore#internalRegionMultiGet
private void internalRegionMultiGet(final Region region, final List<byte[]> subKeys, final boolean readOnlySafe,
final CompletableFuture<Map<ByteArray, byte[]>> future, final int retriesLeft,
final Errors lastCause, final boolean requireLeader) {
// Since the current is client, this will be null
final RegionEngine regionEngine = getRegionEngine(region.getId(), requireLeader);
// require leader on retry
// Set the retry function
final RetryRunner retryRunner = retryCause -> internalRegionMultiGet(region, subKeys, readOnlySafe, future,
retriesLeft - 1, retryCause, true);
final FailoverClosure<Map<ByteArray, byte[]>> closure = new FailoverClosureImpl<>(future,
false, retriesLeft, retryRunner);
if(regionEngine ! =null) {
if (ensureOnValidEpoch(region, regionEngine, closure)) {
// If it is not null, the rawKVStore is obtained and the data is obtained from it
final RawKVStore rawKVStore = getRawKVStore(regionEngine);
if (this.kvDispatcher == null) {
rawKVStore.multiGet(subKeys, readOnlySafe, closure);
} else {
// If kvDispatcher is not null, then it will be executed asynchronously in kvDispatcher
this.kvDispatcher.execute(() -> rawKVStore.multiGet(subKeys, readOnlySafe, closure)); }}}else {
final MultiGetRequest request = new MultiGetRequest();
request.setKeys(subKeys);
request.setReadOnlySafe(readOnlySafe);
request.setRegionId(region.getId());
request.setRegionEpoch(region.getRegionEpoch());
// Invoke the RPC request
this.rheaKVRpcService.callAsyncWithRpc(request, closure, lastCause, requireLeader); }}Copy the code
InternalRegionMultiGet is invoked by the client, so regionEngine is not set. A MultiGetRequest request is directly sent to the leader node of the current region of the server.
Since these methods are basically the same as PUT, we’ve already covered 5. SOFAJRaft source Analysis – How to store data in RheaKV. I did, so I’m not going to repeat it.
The server handles the MultiGetRequest request
The MultiGetRequest request is processed by the KVCommandProcessor, which determines the processing method based on the magic method return value of the request. We’re going to call the handleMultiGetRequest method of DefaultRegionKVService to handle the request.
public void handleMultiGetRequest(final MultiGetRequest request,
finalRequestProcessClosure<BaseRequest, BaseResponse<? >> closure) {
final MultiGetResponse response = new MultiGetResponse();
response.setRegionId(getRegionId());
response.setRegionEpoch(getRegionEpoch());
try {
KVParameterRequires.requireSameEpoch(request, getRegionEpoch());
final List<byte[]> keys = KVParameterRequires.requireNonEmpty(request.getKeys(), "multiGet.keys");
// Call the multiGet method of MetricsRawKVStore
this.rawKVStore.multiGet(keys, request.isReadOnlySafe(), new BaseKVStoreClosure() {
@SuppressWarnings("unchecked")
@Override
public void run(final Status status) {
if (status.isOk()) {
response.setValue((Map<ByteArray, byte[]>) getData());
} else{ setFailure(request, response, status, getError()); } closure.sendResponse(response); }}); }catch (final Throwable t) {
LOG.error("Failed to handle: {}, {}.", request, StackTraceUtil.stackTrace(t)); response.setError(Errors.forException(t)); closure.sendResponse(response); }}Copy the code
The handleMultiGetRequest method calls the multiGet method of MetricsRawKVStore to get data in batches.
MetricsRawKVStore#multiGet
public void multiGet(final List<byte[]> keys, final boolean readOnlySafe, final KVStoreClosure closure) {
Instantiate the MetricsKVClosureAdapter object
final KVStoreClosure c = metricsAdapter(closure, MULTI_GET, keys.size(), 0);
// Call the multiGet method of RaftRawKVStore
this.rawKVStore.multiGet(keys, readOnlySafe, c);
}
Copy the code
The multiGet method passes in an instance of MetricsKVClosureAdapter through which the asynchronous callback to Response is implemented. Then call the multiGet method of RaftRawKVStore.
RaftRawKVStore#multiGet
public void multiGet(final List<byte[]> keys, final boolean readOnlySafe, final KVStoreClosure closure) {
if(! readOnlySafe) {this.kvStore.multiGet(keys, false, closure);
return;
}
// KV storage implements linear consistent read
// Call the readIndex method and wait for the callback to execute
this.node.readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {
@Override
public void run(final Status status, final long index, final byte[] reqCtx) {
// If the status returns success,
if (status.isOk()) {
RaftRawKVStore.this.kvStore.multiGet(keys, true, closure);
return;
}
// Failed to read readIndex. An attempt was made to apply a key read operation to the state machine KVStoreStateMachine of the Leader node
RaftRawKVStore.this.readIndexExecutor.execute(() -> {
if (isLeader()) {
LOG.warn("Fail to [multiGet] with 'ReadIndex': {}, try to applying to the state machine.",
status);
// If 'read index' read fails, try to applying to the state machine at the leader node
applyOperation(KVOperation.createMultiGet(keys), closure);
} else {
LOG.warn("Fail to [multiGet] with 'ReadIndex': {}.", status);
// Client will retry to leader node
new KVClosureAdapter(closure, null).run(status); }}); }}); }Copy the code
MultiGet calls node’s readIndex method for a consistent read and sets the callback. If the result is successful, RocksRawKVStore is called to read the data. If the return is not successful then apply the task to the Leader node’s state machine KVStoreStateMachine.
Linear consistency read readIndex
A simple example of linear consistent reading is that if we write a value at time T1, we must read the value after T1, but not before T1 (think of the Volatile keyword in Java, That is, linear consistent reading implements Java volatile semantics in distributed systems. In short, the Java volatile semantic effect is required in a distributed environment. When a Client requests a write to the cluster and receives a successful response, the result of that write is visible to all subsequent read requests. The difference from Volatile is that volatile implements visibility between threads, whereas SOFAJRaft implements visibility between servers.
SOFAJRaft provides linear consistent reading based on Raft protocol ReadIndex implementation; Node#readIndex(byte [] requestContext, ReadIndexClosure Done) invokes a linear and consistent read request. When a safe read is made, the Closure will be called, normally reading data from the state machine and returning it to the client.
Node#readIndex
public void readIndex(final byte[] requestContext, final ReadIndexClosure done) {
if (this.shutdownLatch ! =null) {
// Execute the callback asynchronously
Utils.runClosureInThread(done, new Status(RaftError.ENODESHUTDOWN, "Node is shutting down."));
throw new IllegalStateException("Node is shutting down");
}
Requires.requireNonNull(done, "Null closure");
//EMPTY_BYTES
this.readOnlyService.addRequest(requestContext, done);
}
Copy the code
ReadIndex calls ReadOnlyServiceImpl#addRequest to pass in requestContext and the callback method done. RequestContext is passed in BytesUtil.EMPTY_BYTES
ReadOnlyServiceImpl#addRequest
public void addRequest(final byte[] reqCtx, final ReadIndexClosure closure) {
if (this.shutdownLatch ! =null) {
Utils.runClosureInThread(closure, new Status(RaftError.EHOSTDOWN, "Was stopped"));
throw new IllegalStateException("Service already shutdown.");
}
try {
EventTranslator<ReadIndexEvent> translator = (event, sequence) -> {
event.done = closure;
//EMPTY_BYTES
event.requestContext = new Bytes(reqCtx);
event.startTime = Utils.monotonicMs();
};
int retryTimes = 0;
while (true) {
//ReadIndexEventHandler
if (this.readIndexQueue.tryPublishEvent(translator)) {
break;
} else {
retryTimes++;
if (retryTimes > MAX_ADD_REQUEST_RETRY_TIMES) {
Utils.runClosureInThread(closure,
new Status(RaftError.EBUSY, "Node is busy, has too many read-only requests."));
this.nodeMetrics.recordTimes("read-index-overload-times".1);
LOG.warn("Node {} ReadOnlyServiceImpl readIndexQueue is overload.".this.node.getNodeId());
return; } ThreadHelper.onSpinWait(); }}}catch (final Exception e) {
Utils.runClosureInThread(closure, new Status(RaftError.EPERM, "Node is down.")); }}Copy the code
The addRequest method wraps the reqCtx and Closure into a single time and passes it to the readIndexQueue queue. The ReadIndexEventHandler handler will process the event once it has been successfully published, and retry the event once it has failed, up to three times.
ReadIndexEventHandler
private class ReadIndexEventHandler implements EventHandler<ReadIndexEvent> {
// task list for batch
private final List<ReadIndexEvent> events = new ArrayList<>(
ReadOnlyServiceImpl.this.raftOptions.getApplyBatch());
@Override
public void onEvent(final ReadIndexEvent newEvent, final long sequence, final boolean endOfBatch)
throws Exception {
if(newEvent.shutdownLatch ! =null) {
executeReadIndexEvents(this.events);
this.events.clear();
newEvent.shutdownLatch.countDown();
return;
}
this.events.add(newEvent);
// Batch execution
if (this.events.size() >= ReadOnlyServiceImpl.this.raftOptions.getApplyBatch() || endOfBatch) {
executeReadIndexEvents(this.events);
this.events.clear(); }}}Copy the code
ReadIndexEventHandler is an internal class within ReadOnlyServiceImpl that has a global set of Events for batchingevents, The executeReadIndexEvents method of ReadOnlyServiceImpl is called to batch the event if the Disruptor queue has reached 32 or is the last one in the Disruptor queue.
ReadOnlyServiceImpl#executeReadIndexEvents
private void executeReadIndexEvents(final List<ReadIndexEvent> events) {
if (events.isEmpty()) {
return;
}
// Initialize ReadIndexRequest
final ReadIndexRequest.Builder rb = ReadIndexRequest.newBuilder() //
.setGroupId(this.node.getGroupId()) //
.setServerId(this.node.getServerId().toString());
final List<ReadIndexState> states = new ArrayList<>(events.size());
for (final ReadIndexEvent event : events) {
rb.addEntries(ZeroByteStringHelper.wrap(event.requestContext.get()));
states.add(new ReadIndexState(event.requestContext, event.done, event.startTime));
}
final ReadIndexRequest request = rb.build();
this.node.handleReadIndexRequest(request, new ReadIndexResponseClosure(states, request));
}
Copy the code
ExecuteReadIndexEvents encapsulates the ReadIndexRequest request and the ReadIndexState collection into ReadIndexResponseClosure for subsequent operations
NodeImpl#handleReadIndexRequest
public void handleReadIndexRequest(final ReadIndexRequest request, final RpcResponseClosure<ReadIndexResponse> done) {
final long startMs = Utils.monotonicMs();
this.readLock.lock();
try {
switch (this.state) {
case STATE_LEADER:
readLeader(request, ReadIndexResponse.newBuilder(), done);
break;
case STATE_FOLLOWER:
readFollower(request, done);
break;
case STATE_TRANSFERRING:
done.run(new Status(RaftError.EBUSY, "Is transferring leadership."));
break;
default:
done.run(new Status(RaftError.EPERM, "Invalid state for readIndex: %s.".this.state));
break; }}finally {
this.readLock.unlock();
this.metrics.recordLatency("handle-read-index", Utils.monotonicMs() - startMs);
this.metrics.recordSize("handle-read-index-entries", request.getEntriesCount()); }}Copy the code
Because linear consistent reads are initiated by nodes in any cluster, they do not need to be forced to be placed on the Leader node and are allowed to be executed on the Follower node, thus greatly reducing the read pressure on the Leader. When performing a consistency read on the Follower node, the Follower node actually calls RpcService#readIndex(LeaderId.geTendPoint (), newRequest, -1, Closure) method sends a ReadIndex request to the Leader node for a consistent read. Therefore, I mainly introduce the consistent read of the Leader.
Continue down to call NodeImpl’s readLeader method NodeImpl#readLeader
private void readLeader(final ReadIndexRequest request, final ReadIndexResponse.Builder respBuilder,
final RpcResponseClosure<ReadIndexResponse> closure) {
//1. Obtain the majority of votes in the cluster node
final int quorum = getQuorum();
if (quorum <= 1) {
// Only one peer, fast path.
// If there is only one node in the cluster, call the callback directly and return success
respBuilder.setSuccess(true) //
.setIndex(this.ballotBox.getLastCommittedIndex());
closure.setResponse(respBuilder.build());
closure.run(Status.OK());
return;
}
final long lastCommittedIndex = this.ballotBox.getLastCommittedIndex();
//2. Terms of office must be equal
// the LogManager checks whether the term is equal to the current term based on the lastCommittedIndex of the BallotBox BallotBox
// If it is not equal to the current term, the Leader node has not committed any logs during its term and must reject the read-only request.
if (this.logManager.getTerm(lastCommittedIndex) ! =this.currTerm) {
// Reject read only request when this leader has not committed any log entry at its term
closure
.run(new Status(
RaftError.EAGAIN,
"ReadIndex request rejected because leader has not committed any log entry at its term, " +
"logIndex=%d, currTerm=%d.",
lastCommittedIndex, this.currTerm));
return;
}
respBuilder.setIndex(lastCommittedIndex);
if(request.getPeerId() ! =null) {
// request from follower, check if the follower is in current conf.
final PeerId peer = new PeerId();
peer.parse(request.getServerId());
//3. The request from followers needs to check whether the followers are currently configured
if (!this.conf.contains(peer)) {
closure
.run(new Status(RaftError.EPERM, "Peer %s is not in current configuration: {}.", peer,
this.conf));
return;
}
}
ReadOnlyOption readOnlyOpt = this.raftOptions.getReadOnlyOptions();
//4. If ReadOnlyLeaseBased is used, check whether the leader is in the lease validity period
if(readOnlyOpt == ReadOnlyOption.ReadOnlyLeaseBased && ! isLeaderLeaseValid()) {// If leader lease timeout, we must change option to ReadOnlySafe
readOnlyOpt = ReadOnlyOption.ReadOnlySafe;
}
switch (readOnlyOpt) {
/ / 5
case ReadOnlySafe:
final List<PeerId> peers = this.conf.getConf().getPeers(); Requires.requireTrue(peers ! =null && !peers.isEmpty(), "Empty peers");
// Set the heartbeat response callback function
final ReadIndexHeartbeatResponseClosure heartbeatDone = new ReadIndexHeartbeatResponseClosure(closure,
respBuilder, quorum, peers.size());
// Send heartbeat requests to followers
// Initiate a Heartbeat to the Followers node if more than half of them return the corresponding
// The Heartbeat responds, and the Leader can confirm that it is still the Leader
for (final PeerId peer : peers) {
if (peer.equals(this.serverId)) {
continue;
}
this.replicatorGroup.sendHeartbeat(peer, heartbeatDone);
}
break;
//6. Ensure that the Leader does not change because elections do not occur during the lease term
// Return the result of the callback directly
case ReadOnlyLeaseBased:
// Responses to followers and local node.
respBuilder.setSuccess(true);
closure.setResponse(respBuilder.build());
closure.run(Status.OK());
break; }}Copy the code
- If there is only one node in the current cluster, return success and call the callback method
- The response constructor sets its index to the lastCommittedIndex of the BallotBox BallotBox, verifying that the number of Raft cluster nodes and the tenure of lastCommittedIndex are as expected
- Requests from followers need to check whether the Follower is currently configured, if not in the current configuration directly call the callback method Settings exception
- Gets the ReadIndex request level ReadOnlyOption configuration. The ReadOnlyOption parameter defaults to ReadOnlySafe. If ReadOnlyLeaseBased is set, isLeaderLeaseValid is called to check whether the leader is in the lease life
- Call Replicator#sendHeartbeat(rid, closure) to ReadOnlySafe and send the Heartbeat request to the Followers node. Send a heartbeat successfully executed ReadIndexHeartbeatResponseClosure heart rate response callback; The ReadIndex Heartbeat response callback checks whether more than half of the nodes, including the Leader node, vote for it, and more than half of the nodes return a successful Heartbeat response to the client. That is, if applyIndex exceeds ReadIndex, the Log corresponding to ReadIndex has been synchronized to provide Linearizable Read
- Set this parameter to ReadOnlyLeaseBased, because the current Leader is considered to be the only valid Leader in the Raft Group during the lifetime of the Leader lease, the ReadIndex step of sending the Heartbeat to confirm the identity is ignored. The Follower node and local node successfully respond to the Read request. The Leader node continues to wait for the state machine to execute until applyIndex exceeds ReadIndex to safely provide Linearizable Read
Either ReadOnlySafe or ReadOnlyLeaseBased calls the Run method of ReadIndexResponseClosure when a successful response is sent.
ReadIndexResponseClosure#run
public void run(final Status status) {
//fail
// The incoming state is not OK, the response failed
if(! status.isOk()) { notifyFail(status);return;
}
final ReadIndexResponse readIndexResponse = getResponse();
//Fail
// Response The response failed
if(! readIndexResponse.getSuccess()) { notifyFail(new Status(-1."Fail to run ReadIndex task, maybe the leader stepped down."));
return;
}
// Success
// The consistency read succeeded
final ReadIndexStatus readIndexStatus = new ReadIndexStatus(this.states, this.request,
readIndexResponse.getIndex());
for (final ReadIndexState state : this.states) {
// Records current commit log index.
// Set the index of the current submission
state.setIndex(readIndexResponse.getIndex());
}
boolean doUnlock = true;
ReadOnlyServiceImpl.this.lock.lock();
try {
// Check whether applyIndex exceeds ReadIndex
if (readIndexStatus.isApplied(ReadOnlyServiceImpl.this.fsmCaller.getLastAppliedIndex())) {
// Already applied, notify readIndex request.
ReadOnlyServiceImpl.this.lock.unlock();
doUnlock = false;
// The Log that has been synchronized to ReadIndex can provide Linearizable Read
notifySuccess(readIndexStatus);
} else {
// Not applied, add it to pending-notify cache.
ReadOnlyServiceImpl.this.pendingNotifyStatus
.computeIfAbsent(readIndexStatus.getIndex(), k -> new ArrayList<>(10)) //.add(readIndexStatus); }}finally {
if (doUnlock) {
ReadOnlyServiceImpl.this.lock.unlock(); }}}Copy the code
The Run method first checks to see if the response needs to fail. If the response succeeds, all encapsulated ReadIndexState is updated to index and applyIndex exceeds ReadIndex. If the value exceeds ReadIndex, all logs that have been copied to the majority (which can be regarded as write operations) are considered safe, and the data represented in the Log is visible to the Client.
ReadOnlyServiceImpl#notifySuccess
private void notifySuccess(final ReadIndexStatus status) {
final long nowMs = Utils.monotonicMs();
final List<ReadIndexState> states = status.getStates();
final int taskCount = states.size();
for (int i = 0; i < taskCount; i++) {
final ReadIndexState task = states.get(i);
final ReadIndexClosure done = task.getDone(); // stack copy
if(done ! =null) {
this.nodeMetrics.recordLatency("read-index", nowMs - task.getStartTimeMs()); done.setResult(task.getIndex(), task.getRequestContext().get()); done.run(Status.OK()); }}}Copy the code
If the response is successful, the notifySuccess method is called, which iterates through the ReadIndexState set encapsulated in Status and calls the run method.
This run method will call RaftRawKVStore#multiGet in the run method that we set up in multiGet
public void multiGet(final List<byte[]> keys, final boolean readOnlySafe, final KVStoreClosure closure) {
if(! readOnlySafe) {this.kvStore.multiGet(keys, false, closure);
return;
}
// KV storage implements linear consistent read
// Call the readIndex method and wait for the callback to execute
this.node.readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {
@Override
public void run(final Status status, final long index, final byte[] reqCtx) {
// If the status returns success,
if (status.isOk()) {
RaftRawKVStore.this.kvStore.multiGet(keys, true, closure);
return;
}
// Failed to read readIndex. An attempt was made to apply a key read operation to the state machine KVStoreStateMachine of the Leader node
RaftRawKVStore.this.readIndexExecutor.execute(() -> {
if (isLeader()) {
LOG.warn("Fail to [multiGet] with 'ReadIndex': {}, try to applying to the state machine.",
status);
// If 'read index' read fails, try to applying to the state machine at the leader node
applyOperation(KVOperation.createMultiGet(keys), closure);
} else {
LOG.warn("Fail to [multiGet] with 'ReadIndex': {}.", status);
// Client will retry to leader node
new KVClosureAdapter(closure, null).run(status); }}); }});Copy the code
The run method calls the multiGet of RaftRawKVStore to get the data directly from RocksDB.
conclusion
In this article, we start with the client get method for RheaKVStore. The RheaKVStore server uses JRaft to implement linear consistent reads. From this example, you should have a good understanding of linear consistent reads.