## Basic principle of ZooKeeper election

Zookeeper elects a leader only in cluster mode.

Four basic concepts encountered in the Zookeeper election process;

Personal ability: Zookeeper is a database. The newer the data of a node in a cluster is, the more powerful the node is. In Zookeeper, the new and old data can be represented by the transaction ID (ZXID). Therefore, the Zookeeper election will be based on the size of zxID as the basic rule of voting.

Vote change: When a node in the Zookeeper cluster starts to vote, it thinks its data is the latest and votes for itself first and sends this vote to other servers. This vote contains two important information: Zxid and sid, sid said the vote of the server id, zxid said the vote of the largest transaction id on the server, it will also receive other servers of the vote, after receive to other server of the vote, according to the vote of zxid to vote with their current biggest zxid to compare on the server, If the zxID in the ballot on other servers is large, it means that the data on the current voting machine is not new. Therefore, this node needs to change the vote to vote for the same vote as the one just received.

Ballot box: The Zookeeper cluster has many nodes. Instead of maintaining a single ballot box application, the Zookeeper cluster uses an array of ballot boxes in each node’s memory. Each node has a ballot box in which the node places its own votes and those it receives from other servers. Because cluster nodes interact with each other, and the PK rules of votes are consistent, so the ballot box stored in each node will be the same, which can also achieve the purpose of sharing a ballot box.

Leader: After the leadership election, each node in the Zookeeper cluster will continuously receive the votes of other nodes, and then PK the votes to change its vote to the node with the latest data. In this way, it can ensure that each node’s vote represents the node with the latest data that it temporarily considers. Again because the other server’s votes will be stored in the ballot boxes, so can according to the box to whether there are more than half the votes and your choice is the same node, all think that data of this node, the latest once more than half of all nodes in the cluster that a node on the latest data, then the node is the leader.

Below I combine the source code to give you a detailed analysis of the principle of fast leader election of Zookeeper.

QuoruPeer represents a zkServer in cluster mode. The QuorumPeer class is defined as follows:

public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider

Definition: QuorumPeer is a ZooKeeperThread, which is a thread. When the node is started, the start() method is called as follows:

Public synchronized void start() {// Check whether the server is a member of the cluster. getView().containsKey(myid)) { throw new RuntimeException("My id " + myid + " not in the peer list"); } // load data and restore data from log files to loadDataBase() in dataTree of ZkDatabase; (1) Enable the four classes (accepted, selector, worker, connection) to handle client request threads (enable NIO server threads to receive client connection requests, handle IO events, etc.). Note that the thread can receive client requests after it is enabled. However, because zKServer has not been initialized, the connection request will be rejected, until the completion of initialization can normally receive processing requests. startServerCnxnFactory(); (2) // Leaderelection, determine the server role, initialize startLeaderElection() according to the server role; (3) startJvmPauseMonitor(); Zkserver initializes super.start(); zkServer initializes super.start(); (4)}Copy the code

(2): Initialize niO-related threads

As a server, Zookeeper communicates with the client on the network. How to effectively communicate with the client so that the network I/O does not become Zookeeper ZooKeeper uses ServerCnxnFactory to manage the connection with the client. There are two implementations. One is NIOServerCnxnFactory, which uses Java native NIO. One is NettyServerCnxnFactory, using Netty implementation; Use ServerCnxn to represent a client-server connection. When a node is started, its start() method is called to complete the start of niO-related threads. The NIOServerCnxnFactory class is defined as follows:

public class NIOServerCnxnFactory extends ServerCnxnFactory

@Override public void start() { stopped = false; If (workerPool == null) {/** * WorkerService is used to manage worker threads (basic I/O operations) ** arg1: specifies the Thread name prefix * arg2: Worker thread Number of threads * arg3: The default is the non-specified thread mode, which means that there are multiple threads in one ExecutorService, one task can be executed by any thread, and the task is out of order. */ workerPool = new WorkerService("NIOWorker", numWorkerThreads, false); } /** * start all the selector threads (read the I/O event to the worker thread, register the connection that the accepted thread sends to the selector thread, Update connection listener events) */ for (SelectorThread thread: selectorThreads) { if (thread.getState() == Thread.State.NEW) { thread.start(); // Ensure thread is started once and only once if (acceptthread.getState () == Thread.State.NEW) { acceptThread.start(); If (expirerThread.getState() == thread.state.new) {if (expirerThread.getState() == expirerThread.state.new) { expirerThread.start(); }}Copy the code

When WorkerService is instantiated, its start() method is called:

public void start() {
        if (numWorkerThreads > 0) {
            if (threadsAreAssignable) {
                for (int i = 1; i <= numWorkerThreads; ++i) {
                    workers.add(Executors.newFixedThreadPool(1, new DaemonThreadFactory(threadNamePrefix, i)));
                }
            } else {
                workers.add(Executors.newFixedThreadPool(numWorkerThreads, new DaemonThreadFactory(threadNamePrefix)));
            }
        }
        stopped = false;
    }
Copy the code

SelectThread class definition:

class SelectorThread extends AbstractSelectThread { private final int id; Private final Queue<SocketChannel> acceptedQueue; Private final Queue<SelectionKey> updateQueue;Copy the code

Selectthreads are thread classes that execute the logical run() method as follows:

public void run() { try{ while (! Stopped) {try {/ * * * read IO event to a worker thread thread processing * / select (); /** * handles newly dispatched connections from the Accept thread, * (1) registers the new connections with the selector; * (2) to register after packaging for NIOServerCnxn NIOServerCnxnFactory, use ipMap to limit the number of connections of each IP * / processAcceptedConnections (); / * * * update updateQueue connected to monitor events in * / processInterestOpsUpdateRequests (); } catch (RuntimeException e) { LOG.warn("Ignoring unexpected runtime exception", e); } catch (Exception e) { LOG.warn("Ignoring unexpected exception", e); }}Copy the code

The AcceptThread class defines:

private class AcceptThread extends AbstractSelectThread

An AcceptThread is a thread class that executes the logical run() method:

public void run() { try { while (! stopped && ! Acceptsocket.socket ().isclosed ()) {try {** * accept (); Add client connection polling to the acceptedQueue of the selector Thread */ select(); } catch (RuntimeException e) { LOG.warn("Ignoring unexpected runtime exception", e); } catch (Exception e) { LOG.warn("Ignoring unexpected exception", e); }}}Copy the code

(3): Initialization of the transmission layer of the application layer of the leader election is completed

Leadership election strategy:

protected Election createElectionAlgorithm(int electionAlgorithm) { Election le = null; //TODO: use a factory rather than a switch switch (electionAlgorithm) { case 1: throw new UnsupportedOperationException("Election Algorithm 1 is not supported."); case 2: throw new UnsupportedOperationException("Election Algorithm 2 is not supported."); Case 3: /** Initialize QuorumCnxManager **/ QuorumCnxManager QCM = createCnxnManager(); QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm); if (oldQcm ! = null) { LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?) "); oldQcm.halt(); } / * * initialize QuorumCnxManager. * * / QuorumCnxManager Listener. The Listener Listener = QCM design. The Listener; if (listener ! = null) {/ * * run QuorumCnxManager. * * / Listener. The Listener start (); FastLeaderElection fle = new FastLeaderElection(this, qcm); fle.start(); le = fle; } else { LOG.error("Null listener when initializing cnx manager"); } break; default: assert false; } return le; }Copy the code

There are three implementations of leader election in Zookeeper, the default is FastLeaderElection FastLeaderElection, the other two have been abandoned, the following focuses on FastLeaderElection.

The structure diagram of Zookeeper quick leader election is as follows:

From the architecture diagram, we can see that the fast Leader election implementation architecture is divided into two layers: application layer and transport layer. So initialization is to initialize the transport layer and the application layer.

The transport layer is initialized

Initialization steps:

1. Initialize QuorumCnxManager

2. Initialization QuorumCnxManager. The Listener

3. Run QuorumCnxManager. The Listener

4. Run QuorumCnxManager

5. Return the FastLeaderElection object

QuorumCnxManager is introduced:

QuorumCnxManager is the transport layer implementation. There are several important properties in QuorumCnxManager:

// Each zkServer in the transport layer needs to send ballot information to other servers. The ballot information comes from the application layer and is stored in queueSendMap grouped by server ID in the transport layer.

  • ConcurrentHashMap<Long, ArrayBlockingQueue> queueSendMap

//SendWorker is the sender that encapsulates the Socket, and senderWorkerMap is used to record other server ids and corresponding sendworkers.

  • ConcurrentHashMap<Long, SendWorker> senderWorkerMap

// Each zkServer in the transport layer will receive the votes sent by other servers, which will be stored in the recvQueue for use by the application layer.

  • ArrayBlockingQueue recvQueue

/ / QuorumCnxManager. The Listener is responsible for the open socket listening in.

  • QuorumCnxManager.Listener

Initializes the application layer

The FastLeaderElection class is the core class for the FastLeaderElection implementation. This class has three important properties:

LinkedBlockingQueue sendqueue;

LinkedBlockingQueue recvqueue;

Messenger messenger;

  • Messenger.WorkerSender

  • Messenger.WorkerReceiver

The FastLeaderElection class also provides an implementation similar to the transport layer, placing the outgoing votes in SendQueue. Sent by messenger.workerSender to transport layer queueSendMap.

Again, the messenger.workerReceiver is responsible for fetching data from the transport layer and putting it into the RecvQueue.

In this way, in the application layer, you just need to add the ballot information to sendQueue to complete the ballot information, or get the elements from recvQueue to get the ballot information.

(4): Implementation of fast leader election (main cycle)

/* * Main loop */ while (running) {switch (getPeerState()) /** Core method makeLEStrategy() : returns the FastLeaderElection lookForLeader() strategy created earlier: SetCurrentVote (makeLEStrategy().lookForLeader()) : Save the elected leader on the current server **/ setCurrentVote(makeLEStrategy().lookForLeader()) break; case OBSERVING: try { ..... // Initialize to observer} catch (Exception e) {log.warn ("Unexpected Exception ", e); } finally { observer.shutdown(); setObserver(null); updateServerState(); if (isRunning()) { Observer.waitForObserverElectionDelay(); } } break; case FOLLOWING: try { ..... // initialize to follower} catch (Exception e) {log.warn ("Unexpected Exception ", e); } finally { follower.shutdown(); setFollower(null); updateServerState(); } break; case LEADING: LOG.info("LEADING"); try { ..... // Initialize to leader} catch (Exception e) {log.warn ("Unexpected Exception ", e); } finally { if (leader ! = null) { leader.shutdown("Forcing shutdown"); setLeader(null); } updateServerState(); } break; }}Copy the code

Core method for leader election lookForLeader():

Public Vote lookForLeader() throws InterruptedException {// Initialize a ballot box Map<Long, Vote> recvset = new HashMap<Long, Vote>(); . synchronized (this) { logicalclock.incrementAndGet(); UpdateProposal (getInitId(), getInitLastLoggedZxid(), getPeerEpoch())); } // Send votes sendNotifications(); while ((self.getPeerState() == ServerState.LOOKING) && (! / / Notification n = recvqueue.poll(notTimeout, timeunit.milliseconds); /* * If (n == null) {if (manager.haveDelivered()) {sendNotifications(); } else {// Set up connections with all the other participants in the leader election. Before the transport layer initialization, only start the listening, but do not initiate the connection, because each server will initiate the connection request to others when starting, if not restricted, two socket connections will be established, which wastes system resources. Because sockets are bidirectional, ZooKeeper restricts that only a large server ID can initiate connection requests to a small server ID. Manager.connectall (); } //n.state: state of the server that sent the vote switch (n.state) {case LOOKING: / * * electionEpoch: If the number of the voting server is large, it indicates that the current server is behind. **/ if (n.lectionepoch > logicalclock.get()) {logicalclock.set(n.lectionepoch); recvset.clear(); /** votes PK, Epoch, zxID, myID **/ if (totalOrderPredicate(n.leader, n.zid, n.epoch, getInitId(), getInitLastLoggedZxid(), GetPeerEpoch ()) {// because the data sent by the other server is newer than the data sent by the server, the updateProposal is the vote updateProposal(n.leader, n.xid, n.peerepoch); } else { updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } // Send the updated vote to sendNotifications(); } else if (n.lectionepoch < logicalclock.get()) { Else if (totalOrderPredicate(n.leader, n.zid, n.epoch, proposedLeader, proposedZxid, proposedEpoch)) { updateProposal(n.leader, n.zxid, n.peerEpoch); sendNotifications(); } recvset.put(n.id, new Vote(n.leader, n.zid, n.lectionepoch, n.peerepoch)); VoteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalClock. get(), proposedEpoch)); If (voteset.hasallquorums ()) {if (voteset.hasallquorums ()) {if (voteset.hasallquorums ()) { While ((n = recvqueue.poll(finalizeWait, timeunit.milliseconds))! = null) { if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { recvqueue.put(n); break; }} // If the leader data is not new, the leader will continue to obtain votes until no votes are obtained. If (n == null) {// Set the state of the server setPeerState(proposedLeader, voteSet); Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch); leaveInstance(endVote); return endVote; } } break; case OBSERVING: LOG.debug("Notification from observer: {}", n.sid); break; case FOLLOWING: case LEADING: /* * if the status of the leading server is following and leading, the leader will be sent to the current server. *. */ if (n.lectionepoch == logicalclock.get()) {recvset.put(n.id, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); voteSet = getVoteTracker(recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); if (voteSet.hasAllQuorums() && checkLeader(recvset, n.leader, n.electionEpoch)) { setPeerState(n.leader, voteSet); Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); leaveInstance(endVote); return endVote; } } outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); voteSet = getVoteTracker(outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) { synchronized (this) { logicalclock.set(n.electionEpoch); setPeerState(n.leader, voteSet); } Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); leaveInstance(endVote); return endVote; } break; default: break; } } else { } } } return null; } finally { } }Copy the code

After the leadership election

The ZooKeeper cluster cannot provide services during the leader election

According to lookForLeader(), only when the role of the server in the cluster is determined, the while will go through the next loop. When it enters the next loop, it will enter the corresponding initialization logic according to the role of the server, and then it can provide services externally.