Mooring floating purpose in this paper, starting from Jane books: www.jianshu.com/u/204b8aaab…

version The date of note
1.0 2020.5.23 The article first
1.1 2020.8.16 Add a bird ‘s-eye view section
1.2 2020.8.21 Add summary, refine language, add pictures
1.3 2020.9.12 Add consistency description
1.4 2021.6.23 The title fromZookeeper (6) : What does the client request experience in the serverChange forZookeeper: Client requests in the server experience what

1. Introduction

When we make a data update request to ZK, what is the process of the request? What consensus algorithm does ZK use to ensure consistency? With these questions in mind, we move into today’s text.

2. Design mode: Chain of Responsibility

Before analyzing the source code, we must briefly introduce the chain of responsibility model, because it is closely related to the content of this article. To put it simply: The chain of responsibility mode will form a blame chain of multiple objects, and then according to their order in the chain of responsibility to find out who is responsible for handling.

So what are the benefits? The relationship between the requestor and the handler is loosely coupled: the handler is free to shirk the “request” until a handler is found. If a processor receives a request that does not belong to its processing, it simply forwards it, without writing additional logical processing.

3. Request logical trace

Let’s start with the ZooKeeperServer class and look at its implementation class. The (common) ZK server roles we need to care about are as follows:

  • LeaderZooKeeperServer
  • FollowerZooKeeperServer
  • ObserverZooKeeperServer

3.1 Realize a bird’s eye view

3.1.1 LeaderZooKeeperServer

The entrance to the code LeaderZooKeeperServer setupRequestProcessors, for the sake of reading experience, the author here will be in the form of view logic organization. Those who like to read the source code can read the implementation details in 3.2.

| - LeaderRequestProcessor \ | - the processRequest / / check whether session failure - PrepRequestProcessor \ - the processRequest / / parameter calibration and create affairs | - based on the requirements ProposalRequestProcessor -- processRequest // initiate proposal -- // Transactional request -- SyncRequestProcessor -- processRequest // Log the request to the transaction log and trigger snapshot \-- AckRequestProcessor \-- processRequest // if necessary to confirm that the transaction log has been collected, Ack feedback for the Proposal of voting \ - CommitProcessor \ - the processRequest / / waiting for the Proposal to vote until the can be submit \ - ToBeAppliedRequestProcessor \-- processRequest // Store commitable proposals that have been processed by the CommitProcessor until the FinalRequestProcessor is finished, -- FinalRequestProcessor -- processRequest // Reply request, Change the state of the in-memory database \-- // non-transactional request \-- CommitProcessor \-- processRequest // skip, Deal only with the transactional request \ - ToBeAppliedRequestProcessor \ - the processRequest / / skip, Work with CommitProcessor \-- FinalRequestProcessor \-- processRequest // reply requests and change the state of the in-memory databaseCopy the code

3.1.2 FollowerZooKeeperServer

/ / handle the client request | - FollowerRequestProcessor \ - the processRequest / / transaction call CommitProcessor, and sent to leader. Or directly to the FinalProcessor | - CommitProcessor \ - the processRequest / / waiting for the Proposal to vote until the submit | - FinalProcessor \ - can be ProcessRequest // Reply to request, And change the state of the memory database / / is dedicated to the processing of the leader by proposal | - SyncRequestProcessor | \ -- the processRequest / / will request record to the transaction log, If needed, trigger snapshot | - SendAckRequestProcessor \ - the processRequest / / ack to proposal the initiator, said its completed log recordsCopy the code

3.1.3 ObserverZooKeeperServer

/ / handle the client request | - ObserverRequestProcessor \ - the processRequest / / and almost identical FollowerRequestProcessor code: Transaction words are called to the CommitProcessor and sent to the leader. Or directly to the FinalProcessor | - CommitProcessor \ - the processRequest / / waiting for the Proposal to vote until the submit | - FinalProcessor \ - can be ProcessRequest // replies to the request and changes the state of the in-memory databaseCopy the code

3.2 Implementation details

The source code analysis below is based on version 3.5.7.

3.2.1 LeaderZooKeeperServer

   @Override
    protected void setupRequestProcessors(a) {
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
        commitProcessor = new CommitProcessor(toBeAppliedProcessor,
                Long.toString(getServerId()), false,
                getZooKeeperServerListener());
        commitProcessor.start();
        ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
                commitProcessor);
        proposalProcessor.initialize();
        prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
        prepRequestProcessor.start();
        firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);

        setupContainerManager();
    }
Copy the code

3.2.2 LeaderRequestProcessor

    @Override
    public void processRequest(Request request)
            throws RequestProcessorException {
        // Check if this is a local session and we are trying to create
        // an ephemeral node, in which case we upgrade the session
        Request upgradeRequest = null;
        try {
            upgradeRequest = lzks.checkUpgradeSession(request);
        } catch (KeeperException ke) {
            if(request.getHdr() ! =null) {
                LOG.debug("Updating header");
                request.getHdr().setType(OpCode.error);
                request.setTxn(new ErrorTxn(ke.code().intValue()));
            }
            request.setException(ke);
            LOG.info("Error creating upgrade request " + ke.getMessage());
        } catch (IOException ie) {
            LOG.error("Unexpected error in upgrade", ie);
        }
        if(upgradeRequest ! =null) {
            nextProcessor.processRequest(upgradeRequest);
        }

        nextProcessor.processRequest(request);
    }
Copy the code

The logic is clear. To check whether the session is expired, create a temporary node. If it fails, an exception is thrown.

3.2.3 PrepRequestProcessor

This class has more than 1000 lines of code, so more typical code will be picked out for analysis. Before we do that, let’s look at the comments:

This request processor is generally at the start of a RequestProcessor

change. It sets up any transactions associated with requests that change the state of the system. It counts on ZooKeeperServer to update outstandingRequests, so that it can take into account transactions that are in the queue to be applied when generating a transaction.

In simple terms, it typically sits at the head of the request processing chain and sets up transactional requests (requests that change the state of the system).

OpCode.create2

The logic for creation requests is roughly as follows:

          case OpCode.create2:
                CreateRequest create2Request = new CreateRequest();
                pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true);
                break;
Copy the code

Jump to pRequest2Txn.

    protected void pRequest2Txn(int type, long zxid, Request request,
                                Record record, boolean deserialize)
        throws KeeperException, IOException, RequestProcessorException
    {
        request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid,
                Time.currentWallTime(), type));

        switch (type) {
            case OpCode.create:
            case OpCode.create2:
            case OpCode.createTTL:
            case OpCode.createContainer: {
                pRequest2TxnCreate(type, request, record, deserialize);
                break;
            }
/ /... Redundant code is no longer displayed
Copy the code

Jump to pRequest2TxnCreate:

    private void pRequest2TxnCreate(int type, Request request, Record record, boolean deserialize) throws IOException, KeeperException {
        if (deserialize) {
            ByteBufferInputStream.byteBuffer2Record(request.request, record);
        }

        int flags;
        String path;
        List<ACL> acl;
        byte[] data;
        long ttl;
        if (type == OpCode.createTTL) {
            CreateTTLRequest createTtlRequest = (CreateTTLRequest)record;
            flags = createTtlRequest.getFlags();
            path = createTtlRequest.getPath();
            acl = createTtlRequest.getAcl();
            data = createTtlRequest.getData();
            ttl = createTtlRequest.getTtl();
        } else {
            CreateRequest createRequest = (CreateRequest)record;
            flags = createRequest.getFlags();
            path = createRequest.getPath();
            acl = createRequest.getAcl();
            data = createRequest.getData();
            ttl = -1;
        }
        CreateMode createMode = CreateMode.fromFlag(flags);
        validateCreateRequest(path, createMode, request, ttl);
        String parentPath = validatePathForCreate(path, request.sessionId);

        List<ACL> listACL = fixupACL(path, request.authInfo, acl);
        ChangeRecord parentRecord = getRecordForPath(parentPath);

        checkACL(zks, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo);
        int parentCVersion = parentRecord.stat.getCversion();
        if (createMode.isSequential()) {
            path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion);
        }
        validatePath(path, request.sessionId);
        try {
            if(getRecordForPath(path) ! =null) {
                throw newKeeperException.NodeExistsException(path); }}catch (KeeperException.NoNodeException e) {
            // ignore this one
        }
        boolean ephemeralParent = EphemeralType.get(parentRecord.stat.getEphemeralOwner()) == EphemeralType.NORMAL;
        if (ephemeralParent) {
            throw new KeeperException.NoChildrenForEphemeralsException(path);
        }
        int newCversion = parentRecord.stat.getCversion()+1;
        if (type == OpCode.createContainer) {
            request.setTxn(new CreateContainerTxn(path, data, listACL, newCversion));
        } else if (type == OpCode.createTTL) {
            request.setTxn(new CreateTTLTxn(path, data, listACL, newCversion, ttl));
        } else {
            request.setTxn(new CreateTxn(path, data, listACL, createMode.isEphemeral(),
                    newCversion));
        }
        StatPersisted s = new StatPersisted();
        if (createMode.isEphemeral()) {
            s.setEphemeralOwner(request.sessionId);
        }
        parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
        parentRecord.childCount++;
        parentRecord.stat.setCversion(newCversion);
        addChangeRecord(parentRecord);
        addChangeRecord(new ChangeRecord(request.getHdr().getZxid(), path, s, 0, listACL));
    }
Copy the code

The logic can be summarized roughly:

  1. Assembly request
  2. Check whether the request is reasonable: The undefined request and parameters are not reasonable
  3. Check whether the upper-level path exists
  4. Check the ACL
  5. Check whether the path is valid
  6. Load the requestoutstandingChangesThe queue
  7. Send to the next Processor

OpCode.multi

Transactional request:

          case OpCode.multi:
                MultiTransactionRecord multiRequest = new MultiTransactionRecord();
                try {
                    ByteBufferInputStream.byteBuffer2Record(request.request, multiRequest);
                } catch(IOException e) {
                    request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(),
                            Time.currentWallTime(), OpCode.multi));
                    throw e;
                }
                List<Txn> txns = new ArrayList<Txn>();
                //Each op in a multi-op must have the same zxid!
                long zxid = zks.getNextZxid();
                KeeperException ke = null;

                //Store off current pending change records in case we need to rollback
                Map<String, ChangeRecord> pendingChanges = getPendingChanges(multiRequest);

                for(Op op: multiRequest) {
                    Record subrequest = op.toRequestRecord();
                    int type;
                    Record txn;

                    /* If we've already failed one of the ops, don't bother * trying the rest as we know it's going to fail and it * would be confusing in the logfiles. */
                    if(ke ! =null) {
                        type = OpCode.error;
                        txn = new ErrorTxn(Code.RUNTIMEINCONSISTENCY.intValue());
                    }

                    /* Prep the request and convert to a Txn */
                    else {
                        try {
                            pRequest2Txn(op.getType(), zxid, request, subrequest, false);
                            type = request.getHdr().getType();
                            txn = request.getTxn();
                        } catch (KeeperException e) {
                            ke = e;
                            type = OpCode.error;
                            txn = new ErrorTxn(e.code().intValue());

                            if (e.code().intValue() > Code.APIERROR.intValue()) {
                                LOG.info("Got user-level KeeperException when processing {} aborting" +
                                        " remaining multi ops. Error Path:{} Error:{}",
                                        request.toString(), e.getPath(), e.getMessage());
                            }

                            request.setException(e);

                            /* Rollback change records from failed multi-op */rollbackPendingChanges(zxid, pendingChanges); }}//FIXME: I don't want to have to serialize it here and then
                    // immediately deserialize in next processor. But I'm
                    // not sure how else to get the txn stored into our list.
                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
                    BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
                    txn.serialize(boa, "request"); ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray()); txns.add(new Txn(type, bb.array()));
                }

                request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid,
                        Time.currentWallTime(), request.type));
                request.setTxn(new MultiTxn(txns));

                break;
Copy the code

The code looks disgusting, but the logic is pretty simple:

  • Iterate through all the requests, assemble them one by one (through a series of checks: the request is valid, the parent path exists, the ACL, the path is valid), and if there is no exception, assemble them into a request, which encapsulates the transaction record. Otherwise, it becomes a request marked as an error and rolls back the record (a Map) in the current scope. In any case, the request is sent to the next Processor.

OpCode.sync

//All the rest don't need to create a Txn - just verify session
            case OpCode.sync:
                zks.sessionTracker.checkSession(request.sessionId,
                        request.getOwner());
                break;
Copy the code

A non-transactional request that verifies the session can be sent to the next Processor.

3.2.4 ProposalRequestProcessor

A Proposal is issued for a transaction request and sent to the CommitProcessor. And ProposalRequestProcessor also delivers a transaction request to the SyncRequestProcessor.

  public void processRequest(Request request) throws RequestProcessorException {
        // LOG.warn("Ack>>> cxid = " + request.cxid + " type = " +
        // request.type + " id = " + request.sessionId);
        // request.addRQRec(">prop");


        /* In the following IF-THEN-ELSE block, we process syncs on the leader. * If the sync is coming from a follower, then the follower * handler adds it to syncHandler. Otherwise, if it is a client of * the leader that issued the sync command, then syncHandler won't * contain the handler. In this case, we add it to syncHandler, and * call processRequest on the next processor. */

        if (request instanceof LearnerSyncRequest){
            zks.getLeader().processSync((LearnerSyncRequest)request);
        } else {
            nextProcessor.processRequest(request);
            if(request.getHdr() ! =null) {
                // We need to sync and get consensus on any transactions
                try {
                    zks.getLeader().propose(request);
                } catch (XidRolloverException e) {
                    throw newRequestProcessorException(e.getMessage(), e); } syncProcessor.processRequest(request); }}}Copy the code

Move on to veto:

  /**
     * create a proposal and send it out to all the members
     *
     * @param request
     * @return the proposal that is queued to send to all the members
     */
    public Proposal propose(Request request) throws XidRolloverException {
        /** * Address the rollover issue. All lower 32bits set indicate a new leader * election. Force a re-election instead. See ZOOKEEPER-1277 */
        if ((request.zxid & 0xffffffffL) = =0xffffffffL) {
            String msg =
                    "zxid lower 32 bits have rolled over, forcing re-election, and therefore new epoch start";
            shutdown(msg);
            throw new XidRolloverException(msg);
        }

        byte[] data = SerializeUtils.serializeRequest(request);
        proposalStats.setLastBufferSize(data.length);
        QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);

        Proposal p = new Proposal();
        p.packet = pp;
        p.request = request;                
        
        synchronized(this) {
           p.addQuorumVerifier(self.getQuorumVerifier());
                   
           if (request.getHdr().getType() == OpCode.reconfig){
               self.setLastSeenQuorumVerifier(request.qv, true);                       
           }
           
           if (self.getQuorumVerifier().getVersion()<self.getLastSeenQuorumVerifier().getVersion()) {
               p.addQuorumVerifier(self.getLastSeenQuorumVerifier());
           }
                   
            if (LOG.isDebugEnabled()) {
                LOG.debug("Proposing:: " + request);
            }

            lastProposed = p.packet.getZxid();
            outstandingProposals.put(lastProposed, p);
            sendPacket(pp);
        }
        return p;
    }
Copy the code

The submitted Record is a QuorumPacket, which implements the Record interface. Specifies type as PROPOSAL. Let’s take a look at the comment:

    /** * This message type is sent by a leader to propose a mutation. */
    public final static int PROPOSAL = 2;
Copy the code

Obviously, this is a variable request that only the Leader can initiate. Here’s the logic:

  1. In theoutstandingProposalsThe Map of
  2. Assemble into sent Packet
  3. Pass the Proposal to the next Processor

3.2.5 CommitProcessor

As the name implies, the transaction committer. Only care about transaction requests — wait for the in-cluster Proposal to be voted on until it can be submitted. With CommitProcessor, each server can process transactions sequentially.

The code for this section is too simple to cover. With this information, the reader can understand what the chain of requests looks like.

3.2.6 SyncRequestProcessor

The logic is simple, log the request to the transaction log and try to trigger the snapshot.

   public void processRequest(Request request) {
        // request.addRQRec(">sync");
        queuedRequests.add(request);
    }

   // The thread's core method, which operates on queuedRequests
    @Override
    public void run(a) {
        try {
            int logCount = 0;

            // we do this in an attempt to ensure that not all of the servers
            // in the ensemble take a snapshot at the same time
            int randRoll = r.nextInt(snapCount/2);
            while (true) {
                Request si = null;
                if (toFlush.isEmpty()) {
                    si = queuedRequests.take();
                } else {
                    si = queuedRequests.poll();
                    if (si == null) {
                        flush(toFlush);
                        continue; }}if (si == requestOfDeath) {
                    break;
                }
                if(si ! =null) {
                    // track the number of records written to the log
                    if (zks.getZKDatabase().append(si)) {
                        logCount++;
                        if (logCount > (snapCount / 2 + randRoll)) {
                            randRoll = r.nextInt(snapCount/2);
                            // roll the log
                            zks.getZKDatabase().rollLog();
                            // take a snapshot
                            if(snapInProcess ! =null && snapInProcess.isAlive()) {
                                LOG.warn("Too busy to snap, skipping");
                            } else {
                                snapInProcess = new ZooKeeperThread("Snapshot Thread") {
                                        public void run(a) {
                                            try {
                                                zks.takeSnapshot();
                                            } catch(Exception e) {
                                                LOG.warn("Unexpected exception", e); }}}; snapInProcess.start(); } logCount =0; }}else if (toFlush.isEmpty()) {
                        // optimization for read heavy workloads
                        // iff this is a read, and there are no pending
                        // flushes (writes), then just pass this to the next
                        // processor
                        if(nextProcessor ! =null) {
                            nextProcessor.processRequest(si);
                            if (nextProcessor instanceofFlushable) { ((Flushable)nextProcessor).flush(); }}continue;
                    }
                    toFlush.add(si);
                    if (toFlush.size() > 1000) { flush(toFlush); }}}}catch (Throwable t) {
            handleException(this.getName(), t);
        } finally{
            running = false;
        }
        LOG.info("SyncRequestProcessor exited!");
    }
Copy the code

3.2.7 ToBeAppliedRequestProcessor

The core of the handler is a toBeApplied queue that stores commitable proposals that have been processed by the CommitProcessor — they are not removed until the FinalRequestProcessor has finished processing them.

        /* * (non-Javadoc) * * @see org.apache.zookeeper.server.RequestProcessor#processRequest(org.apache.zookeeper.server.Request) */
        public void processRequest(Request request) throws RequestProcessorException {
            next.processRequest(request);

            // The only requests that should be on toBeApplied are write
            // requests, for which we will have a hdr. We can't simply use
            // request.zxid here because that is set on read requests to equal
            // the zxid of the last write op.
            if(request.getHdr() ! =null) {
                long zxid = request.getHdr().getZxid();
                Iterator<Proposal> iter = leader.toBeApplied.iterator();
                if (iter.hasNext()) {
                    Proposal p = iter.next();
                    if(p.request ! =null && p.request.zxid == zxid) {
                        iter.remove();
                        return;
                    }
                }
                LOG.error("Committed request not found on toBeApplied: "+ request); }}Copy the code

3.2.8 FinalRequestProcessor

For reasons of length, here is a brief description of the logic: since it is the last processor, it needs to reply to the corresponding request and is responsible for the execution of the transaction request — changing the state of the in-memory database.

3.2.9 FollowerZooKeeperServer

Take a look at the code that assembles Processors:

    @Override
    protected void setupRequestProcessors(a) {
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        commitProcessor = new CommitProcessor(finalProcessor,
                Long.toString(getServerId()), true, getZooKeeperServerListener());
        commitProcessor.start();
        firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
        ((FollowerRequestProcessor) firstProcessor).start();
        syncProcessor = new SyncRequestProcessor(this.new SendAckRequestProcessor((Learner)getFollower()));
        syncProcessor.start();
    }
Copy the code

As you can see, there are two more pairs of request chains:

  1. FollowerRequestProcessor -> CommitProcessor -> FinalProcessor
  2. SyncRequestProcessor -> SendAckRequestProcessor

When the request comes in, which Processor will handle it? This side can be roughly tracked:

  • FirstProcessor (FollowerRequestProcessor), which is the main Handle process, consists of its parent classZooKeeperServerTo schedule, handle requests
  • The syncProcessor (that is, the SyncRequestProcessor) runs fromlogRequestThe entrance comes in. The class ofLearnerDispatches in, handle the leader’s request.

Are you an Observer? Are you a Learner? Are you an Observer? That depends on the signature:

/** * This class is the superclass of two of the three main actors in a ZK * ensemble: Followers and Observers. Both Followers and Observers share * a good deal of code which is moved into Peer to avoid duplication. */
public class Learner {      
Copy the code

In order to avoid duplicate code, some common code was extracted.

3.2.10 FollowerRequestProcessor

The normal processor of followers determines whether a transaction is a transaction and sends it to the Leader; otherwise, it processes the transaction itself.

FollowerRequestProcessor.run

    @Override
    public void run(a) {
        try {
            while(! finished) { Request request = queuedRequests.take();if (LOG.isTraceEnabled()) {
                    ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK,
                            'F', request, "");
                }
                if (request == Request.requestOfDeath) {
                    break;
                }
                // We want to queue the request to be processed before we submit
                // the request to the leader so that we are ready to receive
                // the response
                nextProcessor.processRequest(request);

                // We now ship the request to the leader. As with all
                // other quorum operations, sync also follows this code
                // path, but different from others, we need to keep track
                // of the sync operations this follower has pending, so we
                // add it to pendingSyncs.
                switch (request.type) {
                case OpCode.sync:
                    zks.pendingSyncs.add(request);
                    zks.getFollower().request(request);
                    break;
                case OpCode.create:
                case OpCode.create2:
                case OpCode.createTTL:
                case OpCode.createContainer:
                case OpCode.delete:
                case OpCode.deleteContainer:
                case OpCode.setData:
                case OpCode.reconfig:
                case OpCode.setACL:
                case OpCode.multi:
                case OpCode.check:
                    zks.getFollower().request(request);
                    break;
                case OpCode.createSession:
                case OpCode.closeSession:
                    // Don't forward local sessions to the leader.
                    if(! request.isLocalSession()) { zks.getFollower().request(request); }break; }}}catch (Exception e) {
            handleException(this.getName(), e);
        }
        LOG.info("FollowerRequestProcessor exited loop!");
    }
Copy the code

The logic of delivering requests to commitprocessors is confusing. Transactional messages should be delivered to the leader, so there is no need for a processor that, as mentioned earlier, waits for the Proposal to be voted on in the cluster until it can be delivered.

3.2.11 SendAckRequestProcessor

    public void processRequest(Request si) {
        if(si.type ! = OpCode.sync){ QuorumPacket qp =new QuorumPacket(Leader.ACK, si.getHdr().getZxid(), null.null);
            try {
                learner.writePacket(qp, false);
            } catch (IOException e) {
                LOG.warn("Closing connection to leader, exception during packet send", e);
                try {
                    if (!learner.sock.isClosed()) {
                        learner.sock.close();
                    }
                } catch (IOException e1) {
                    // Nothing to do, we are shutting things down, so an exception here is irrelevant
                    LOG.debug("Ignoring error closing the connection", e1); }}}}Copy the code

The logic is very simple, used to feedback the ACK success, indicating that it has completed the transaction log recording.

3.2.12 ObserverZooKeeperServer

    /** * Set up the request processors for an Observer: * firstProcesor->commitProcessor->finalProcessor */
    @Override
    protected void setupRequestProcessors(a) {      
        // We might consider changing the processor behaviour of 
        // Observers to, for example, remove the disk sync requirements.
        // Currently, they behave almost exactly the same as followers.
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        commitProcessor = new CommitProcessor(finalProcessor,
                Long.toString(getServerId()), true,
                getZooKeeperServerListener());
        commitProcessor.start();
        firstProcessor = new ObserverRequestProcessor(this, commitProcessor);
        ((ObserverRequestProcessor) firstProcessor).start();

        /* * Observer should write to disk, so that the it won't request * too old txn from the leader which may lead to getting an entire * snapshot. * * However, this may degrade performance as it has to write to disk * and do periodic snapshot which may double the memory requirements */
        if (syncRequestProcessorEnabled) {
            syncProcessor = new SyncRequestProcessor(this.null); syncProcessor.start(); }}Copy the code

The logic is clear (probably because of the code added after 3.3.0). The normal request chain is:

  1. ObserverRequestProcessor
  2. CommitProcessor
  3. FinalProcessor

If syncRequestProcessorEnabled open cases (the default), which means that the Observer may do to record the transaction log and snapshot, this will give down certain performance, as well as more memory requirements.

ObserverRequestProcessor is the same as Follow RequestProcessor. Any aspiring engineer will find a way to reuse code.

    @Override
    public void run(a) {
        try {
            while(! finished) { Request request = queuedRequests.take();if (LOG.isTraceEnabled()) {
                    ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK,
                            'F', request, "");
                }
                if (request == Request.requestOfDeath) {
                    break;
                }
                // We want to queue the request to be processed before we submit
                // the request to the leader so that we are ready to receive
                // the response
                nextProcessor.processRequest(request);

                // We now ship the request to the leader. As with all
                // other quorum operations, sync also follows this code
                // path, but different from others, we need to keep track
                // of the sync operations this Observer has pending, so we
                // add it to pendingSyncs.
                switch (request.type) {
                case OpCode.sync:
                    zks.pendingSyncs.add(request);
                    zks.getObserver().request(request);
                    break;
                case OpCode.create:
                case OpCode.create2:
                case OpCode.createTTL:
                case OpCode.createContainer:
                case OpCode.delete:
                case OpCode.deleteContainer:
                case OpCode.setData:
                case OpCode.reconfig:
                case OpCode.setACL:
                case OpCode.multi:
                case OpCode.check:
                    zks.getObserver().request(request);
                    break;
                case OpCode.createSession:
                case OpCode.closeSession:
                    // Don't forward local sessions to the leader.
                    if(! request.isLocalSession()) { zks.getObserver().request(request); }break; }}}catch (Exception e) {
            handleException(this.getName(), e);
        }
        LOG.info("ObserverRequestProcessor exited loop!");
    }

Copy the code

Above, is the source analysis part, based on version 3.5.7.

4. Distributed Transactions: How does ZK handle transactions

Before and everyone over the source code, I believe that you have a certain understanding of the ZK request processing process. Next, let’s take a look at the transaction request process. A ProposalRequestProcessor to the Leader begins with three phases:

  1. Sync
  2. Proposal
  3. Commit

4.1 the Sync

It is done primarily by a ProposalRequestProcessor, which tells the machine (Leader and Follower) participating in the proposQL to record a transaction log.

4.2 Proposal

Each transaction request must be approved by more than half of the votes (Leader + Follower).

  1. The Leader checks that the ZXID of the server is available and initiates a Proposal if the ZXID is available. If not available, XidRolloverException is thrown. (see org. Apache. Zookeeper. Server. Quorum. Leader. Propose)
  2. According to the request header, transaction and ZXID generate Proposal (see org. Apache. Zookeeper. Server. Quorum. Leader. Propose)
  3. Broadcast to all followers server (see org. Apache. Zookeeper. Server. Quorum. Leader. The sendPacket)
  4. Related members log and ACK to Leader server – until more than half, or timeout (see org. Apache. Zookeeper. Server. Quorum. Leader. ProcessAck).
  5. Throws the request intoBeAppliedIn the queue. (see org. Apache. Zookeeper. Server. Quorum. Leader. TryToCommit)
  6. Broadcast Commit, which is sent to followersCOMMIT, while the Observer isInform. This led them to submit the Proposal. (see org.apache.zookeeper.server.quorum.Leader.com MIT && inform)

At this point, we are done with SyncRequestProcessor -> AckRequestProcessor.

4.3 the Commit

Then speak CommitProcessor – > ToBeAppliedRequestProcessor – > FinalRequestProcessor process.

  1. After a request is sent to a CommitPrcocessor, it is placed in a queue and retrieved one by one by the thread. When a transaction request is retrieved, a pending object is set until the vote ends. This ensures sequence of transactions and allows CommitPrcocessor to conveniently know if there are any transactions in progress in the cluster.
  2. Vote yes to wake up the COMMIT process. Submit a request tocommittedRequestsThe queue, and then sent to the ToBeAppliedRequestProcessor one by one.
  3. ToBeAppliedRequestProcessor will wait FinalRequestProcessor processing is completed, fromtoBeAppliedRemove the Proposal from the queue.
  4. FinalRequestProcessor checks whether the latest request in the queue has an ZXID less than or equal to the current request:
    • If so, the request is removed. In this case, the latest application transaction completes earlier than the current transaction. The current transaction request is invalid, but is sent to the commitedLog.
    • That’s kind of normal, because this pair of columns is atPrepRequestProcessorAdded when. It is then applied to the in-memory database, which maintains a committedLog with a default limit of 500 to store recently successful proposals for quick synchronization.

If the server breaks down in this step, data is revised through the pre-written log of the proposal phase when the machine is pulled up, and is converted to the proposal through the PlayBackListener and saved to the committedLog for easy synchronization.

5. Summary

In this implementation, we can see that ZK actually sacrifices strong consistency to improve some usability while providing final consistency. When synchronizing data between clusters, if a client sends a request to an unsynchronized server, the old data will be read. This can also be found on ZK’s website (see the link in Resources) :

Consistency Guarantees
ZooKeeper is a high performance, scalable service. Both reads and write operations are designed to be fast, though reads are faster than writes. The reason for this is that in the case of reads, ZooKeeper can serve older data, which in turn is due to ZooKeeper's consistency guarantees:

Sequential Consistency
Updates from a client will be applied in the order that they were sent.
...........
Copy the code

In addition, ZK handles transactions a bit like a two-phase commit. This is the ZAB algorithm, and in the next article, we’ll look at its implementation in detail, as well as another use for it — distributed elections.

References:

  • ZooKeeper Internals:zookeeper.apache.org/doc/r3.5.0-…
  • The Modular Composition of Coordination Services “: static.googleusercontent.com/media/resea…
  • Zookeeper Consistency Guarantees:zookeeper.apache.org/doc/r3.4.9/…