Following up on the previous article, this article looks at the processing chain on the Zookeeper server.
PrepRequestProcessor -> SyncRequestProcessor -> FinalRequestProcessor processing chain.
PrepRequestProcessor.java
Create transaction request headers, transaction bodies, session checks, ACL checks, version checks, etc.
PrepRequestProcessor#run()
@Override
public void run(a) {...try {
while (true) { ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_SIZE.add(submittedRequests.size()); Request request = submittedRequests.take(); . request.prepStartTime = Time.currentElapsedTime(); pRequest(request); }}catch (Exception e) {
handleException(this.getName(), e);
}
LOG.info("PrepRequestProcessor exited loop!");
}
Copy the code
Fetch the Request from the queue and give it to pRequest(Request);
PrepRequestProcessor#pRequest(Request request)
protected void pRequest(Request request) throws RequestProcessorException {
// LOG.info("Prep>>> cxid = " + request.cxid + " type = " +
// request.type + " id = 0x" + Long.toHexString(request.sessionId));
request.setHdr(null);
request.setTxn(null);
if(! request.isThrottled()) {// Generate the transaction header and body
pRequestHelper(request);
}
/ / transaction ID
request.zxid = zks.getZxid();
long timeFinishedPrepare = Time.currentElapsedTime();
ServerMetrics.getMetrics().PREP_PROCESS_TIME.add(timeFinishedPrepare - request.prepStartTime);
// Pass to the next handler
nextProcessor.processRequest(request);
ServerMetrics.getMetrics().PROPOSAL_PROCESS_TIME.add(Time.currentElapsedTime() - timeFinishedPrepare);
}
Copy the code
This step mainly generates the transaction header and body and passes it on to the next processor.
private void pRequestHelper(Request request) throws RequestProcessorException {
try {
switch (request.type) {
case OpCode.createContainer:
case OpCode.create:
case OpCode.create2:
CreateRequest create2Request = new CreateRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true);
break; .// There is no need to create a transaction request
//All the rest don't need to create a Txn - just verify session
case OpCode.sync:
case OpCode.exists:
case OpCode.getData:
case OpCode.getACL:
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
break;
default:
LOG.warn("unknown type {}", request.type);
break; }}catch (KeeperException e) {
...
} catch(Exception e) { ... }}Copy the code
This step falls into two categories for requests, one is a write request, also known as a transaction request,pRequest2Txn(). The other is a read request, which is a non-transactional request.
protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) throws KeeperException, IOException, RequestProcessorException {
if (request.getHdr() == null) {
// Transaction request header
request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid,
Time.currentWallTime(), type));
}
switch (type) {
case OpCode.create:
case OpCode.create2:
case OpCode.createTTL:
case OpCode.createContainer: {
/ / the next step
pRequest2TxnCreate(type, request, record, deserialize);
break; }}Copy the code
This step creates the transaction request header.
private void pRequest2TxnCreate(int type, Request request, Record record, boolean deserialize) throws IOException, KeeperException {
if (deserialize) {
ByteBufferInputStream.byteBuffer2Record(request.request, record);
}
int flags;
String path;
List<ACL> acl;
byte[] data;
long ttl;
if (type == OpCode.createTTL) {
CreateTTLRequest createTtlRequest = (CreateTTLRequest) record;
flags = createTtlRequest.getFlags();
path = createTtlRequest.getPath();
acl = createTtlRequest.getAcl();
data = createTtlRequest.getData();
ttl = createTtlRequest.getTtl();
} else {
CreateRequest createRequest = (CreateRequest) record;
flags = createRequest.getFlags();
path = createRequest.getPath();
acl = createRequest.getAcl();
data = createRequest.getData();
ttl = -1;
}
// Node creation mode
CreateMode createMode = CreateMode.fromFlag(flags);
validateCreateRequest(path, createMode, request, ttl);
String parentPath = validatePathForCreate(path, request.sessionId);
List<ACL> listACL = fixupACL(path, request.authInfo, acl);
ChangeRecord parentRecord = getRecordForPath(parentPath);
// check ACL
zks.checkACL(request.cnxn, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo, path, listACL);
int parentCVersion = parentRecord.stat.getCversion();
if (createMode.isSequential()) {
path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion);
}
// Verify the path
validatePath(path, request.sessionId);
try {
if(getRecordForPath(path) ! =null) {
throw newKeeperException.NodeExistsException(path); }}catch (KeeperException.NoNodeException e) {
// ignore this one
}
boolean ephemeralParent = EphemeralType.get(parentRecord.stat.getEphemeralOwner()) == EphemeralType.NORMAL;
if (ephemeralParent) {
throw new KeeperException.NoChildrenForEphemeralsException(path);
}
/ / version
int newCversion = parentRecord.stat.getCversion() + 1;
zks.checkQuota(path, null, data, OpCode.create);
if (type == OpCode.createContainer) {
request.setTxn(new CreateContainerTxn(path, data, listACL, newCversion));
} else if (type == OpCode.createTTL) {
request.setTxn(new CreateTTLTxn(path, data, listACL, newCversion, ttl));
} else {
request.setTxn(new CreateTxn(path, data, listACL, createMode.isEphemeral(), newCversion));
}
TxnHeader hdr = request.getHdr();
long ephemeralOwner = 0;
if (createMode.isContainer()) {
ephemeralOwner = EphemeralType.CONTAINER_EPHEMERAL_OWNER;
} else if (createMode.isTTL()) {
ephemeralOwner = EphemeralType.TTL.toEphemeralOwner(ttl);
} else if (createMode.isEphemeral()) {
ephemeralOwner = request.sessionId;
}
StatPersisted s = DataTree.createStat(hdr.getZxid(), hdr.getTime(), ephemeralOwner);
parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
parentRecord.childCount++;
parentRecord.stat.setCversion(newCversion);
parentRecord.stat.setPzxid(request.getHdr().getZxid());
parentRecord.precalculatedDigest = precalculateDigest(
DigestOpCode.UPDATE, parentPath, parentRecord.data, parentRecord.stat);
addChangeRecord(parentRecord);
ChangeRecord nodeRecord = new ChangeRecord(
request.getHdr().getZxid(), path, s, 0, listACL);
nodeRecord.data = data;
nodeRecord.precalculatedDigest = precalculateDigest(
DigestOpCode.ADD, path, nodeRecord.data, s);
setTxnDigest(request, nodeRecord.precalculatedDigest);
// Add to the transaction being processed
addChangeRecord(nodeRecord);
}
Copy the code
This step is basically the various checks to the final condition of the transaction request queue being processed.
Let’s take a look at one processor, the SyncRequestProcessor
The main job of this handler is to log requests to the transaction log.
public void processRequest(final Request request) {
Objects.requireNonNull(request, "Request cannot be null");
request.syncQueueStartTime = Time.currentElapsedTime();
// Record the request
queuedRequests.add(request);
ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUED.add(1);
}
Copy the code
run()
@Override
public void run(a) {
try {
// we do this in an attempt to ensure that not all of the servers
// in the ensemble take a snapshot at the same time
resetSnapshotStats();
lastFlushTime = Time.currentElapsedTime();
while (true) {
ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_SIZE.add(queuedRequests.size());
long pollTime = Math.min(zks.getMaxWriteQueuePollTime(), getRemainingDelay());
Request si = queuedRequests.poll(pollTime, TimeUnit.MILLISECONDS);
if (si == null) {
/* We timed out looking for more writes to batch, go ahead and flush immediately */
flush();
si = queuedRequests.take();
}
if (si == REQUEST_OF_DEATH) {
break;
}
long startProcessTime = Time.currentElapsedTime();
ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(startProcessTime - si.syncQueueStartTime);
// track the number of records written to the log
if(! si.isThrottled() && zks.getZKDatabase().append(si)) {if (shouldSnapshot()) {
resetSnapshotStats();
// roll the log
zks.getZKDatabase().rollLog();
// take a snapshot
if(! snapThreadMutex.tryAcquire()) { LOG.warn("Too busy to snap, skipping");
} else {
new ZooKeeperThread("Snapshot Thread") {
public void run(a) {
try {
zks.takeSnapshot();
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
} finally{ snapThreadMutex.release(); } } }.start(); }}}else if (toFlush.isEmpty()) {
// optimization for read heavy workloads
// iff this is a read or a throttled request(which doesn't need to be written to the disk),
// and there are no pending flushes (writes), then just pass this to the next processor
if(nextProcessor ! =null) {
nextProcessor.processRequest(si);
if (nextProcessor instanceofFlushable) { ((Flushable) nextProcessor).flush(); }}continue;
}
// Add to queue
toFlush.add(si);
if(shouldFlush()) { flush(); } ServerMetrics.getMetrics().SYNC_PROCESS_TIME.add(Time.currentElapsedTime() - startProcessTime); }}catch (Throwable t) {
handleException(this.getName(), t);
}
LOG.info("SyncRequestProcessor exited!");
}
Copy the code
Zks.getzkdatabase ().append(si) writes a log, but has not committed it yet.
Flush () commits the transaction and passes it to the next handler
private void flush(a) throws IOException, RequestProcessorException {
if (this.toFlush.isEmpty()) {
return;
}
ServerMetrics.getMetrics().BATCH_SIZE.add(toFlush.size());
long flushStartTime = Time.currentElapsedTime();
/ / submit
zks.getZKDatabase().commit();
ServerMetrics.getMetrics().SYNC_PROCESSOR_FLUSH_TIME.add(Time.currentElapsedTime() - flushStartTime);
if (this.nextProcessor == null) {
this.toFlush.clear();
} else {
while (!this.toFlush.isEmpty()) {
final Request i = this.toFlush.remove();
long latency = Time.currentElapsedTime() - i.syncQueueStartTime;
ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_AND_FLUSH_TIME.add(latency);
// Pass to the next handler
this.nextProcessor.processRequest(i);
}
if (this.nextProcessor instanceof Flushable) {
((Flushable) this.nextProcessor).flush();
}
}
lastFlushTime = Time.currentElapsedTime();
}
Copy the code
FinalRequestProcessor.java
The FinalRequestProcessor responds to requests and changes the state of the in-memory database
public void processRequest(Request request) {
ProcessTxnResult rc = null;
if(! request.isThrottled()) {// Apply transaction
rc = applyRequest(request);
}
if (request.cnxn == null) {
return;
}
ServerCnxn cnxn = request.cnxn;
long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();
String lastOp = "NA";
// Notify ZooKeeperServer that the request has finished so that it can
// update any request accounting/throttling limits
zks.decInProcess();
zks.requestFinished(request);
Code err = Code.OK;
Record rsp = null;
String path = null;
int responseSize = 0;
try{... .case OpCode.create: {
lastOp = "CREA";
rsp = new CreateResponse(rc.path);
err = Code.get(rc.err);
requestPathMetricsCollector.registerRequest(request.type, rc.path);
break; }}Copy the code
The important step is to apply the transaction, which is to actually modify the in-memory database logic and return the response to the client.
ApplyRequest () applies the transaction
private ProcessTxnResult applyRequest(Request request) {
ProcessTxnResult rc = zks.processTxn(request);
// ZOOKEEPER-558:
// In some cases the server does not close the connection (e.g., closeconn buffer
// is not being queued - zookeeper-558) properly.
// when the client closes the connection. The server should still close the session, though.
// Calling closeSession() after losing the cnxn, results in the client close session response being dropped.
if (request.type == OpCode.closeSession && connClosedByClient(request)) {
// We need to check if we can close the session id.
// Sometimes the corresponding ServerCnxnFactory could be null because
// we are just playing diffs from the leader.
if (closeSession(zks.serverCnxnFactory, request.sessionId)
|| closeSession(zks.secureServerCnxnFactory, request.sessionId)) {
returnrc; }}if(request.getHdr() ! =null) {
/* * Request header is created only by the leader, so this must be * a quorum request. Since we're comparing timestamps across hosts, * this metric may be incorrect. However, it's still a very useful * metric to track in the happy case. If there is clock drift, * the latency can go negative. Note: headers use wall time, not * CLOCK_MONOTONIC. */
long propagationLatency = Time.currentWallTime() - request.getHdr().getTime();
if (propagationLatency >= 0) { ServerMetrics.getMetrics().PROPAGATION_LATENCY.add(propagationLatency); }}return rc;
}
Copy the code
ZookeeperServer#processTxn()
// entry point for FinalRequestProcessor.java
public ProcessTxnResult processTxn(Request request) {
TxnHeader hdr = request.getHdr();
processTxnForSessionEvents(request, hdr, request.getTxn());
final booleanwriteRequest = (hdr ! =null);
final boolean quorumRequest = request.isQuorum();
// return fast w/o synchronization when we get a read
if(! writeRequest && ! quorumRequest) {return new ProcessTxnResult();
}
synchronized (outstandingChanges) {
// Process transactions
ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn(), request.getTxnDigest());
// request.hdr is set for write requests, which are the only ones
// that add to outstandingChanges.
if (writeRequest) {
long zxid = hdr.getZxid();
while(! outstandingChanges.isEmpty() && outstandingChanges.peek().zxid <= zxid) { ChangeRecord cr = outstandingChanges.remove(); ServerMetrics.getMetrics().OUTSTANDING_CHANGES_REMOVED.add(1);
if (cr.zxid < zxid) {
LOG.warn(
"Zxid outstanding 0x{} is less than current 0x{}",
Long.toHexString(cr.zxid),
Long.toHexString(zxid));
}
if(outstandingChangesForPath.get(cr.path) == cr) { outstandingChangesForPath.remove(cr.path); }}}// do not add non quorum packets to the queue.
if (quorumRequest) {
getZKDatabase().addCommittedProposal(request);
}
returnrc; }}Copy the code
ZookeeperServer.java
private ProcessTxnResult processTxnInDB(TxnHeader hdr, Record txn, TxnDigest digest) {
if (hdr == null) {
// Direct return, non-transactional request
return new ProcessTxnResult();
} else {
returngetZKDatabase().processTxn(hdr, txn, digest); }}Copy the code
ZKDataBase.java
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn, TxnDigest digest) {
return dataTree.processTxn(hdr, txn, digest);
}
Copy the code
DataTree.java
public ProcessTxnResult processTxn(TxnHeader header, Record txn, TxnDigest digest) {
ProcessTxnResult result = processTxn(header, txn);
compareDigest(header, txn, digest);
return result;
}
Copy the code
DataTree.java
public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn) {
ProcessTxnResult rc = new ProcessTxnResult();
try {
rc.clientId = header.getClientId();
rc.cxid = header.getCxid();
rc.zxid = header.getZxid();
rc.type = header.getType();
rc.err = 0;
rc.multiResult = null;
switch (header.getType()) {
case OpCode.create:
CreateTxn createTxn = (CreateTxn) txn;
rc.path = createTxn.getPath();
// Create a node
createNode(
createTxn.getPath(),
createTxn.getData(),
createTxn.getAcl(),
createTxn.getEphemeral() ? header.getClientId() : 0,
createTxn.getParentCVersion(),
header.getZxid(),
header.getTime(),
null);
break; }}Copy the code
At this point, the process of creating a node ends.
To summarize
PrepRequestProcessor – Various pre-checks to create transaction request headers
-> SyncRequestProcessor — records transaction logs
-> FinalRequestProcessor — real logical processing, applying transactions, modifying in-memory databases