First praise after look, form a habit 🌹

Welcome to wechat attention: The way of Java programming progress a little bit every day, precipitation technology to share knowledge.

The Zookeeper Leader election

We talked about the ZAB protocol and the basic concepts of Zookeeper, so we will talk about the rest of the “Leader election”, “distributed lock”, “stampeders and brain split”. These knowledge should be enough to prepare for the school to build rockets.

Zookeeper Leader election process and the FastLeaderElection algorithm involved.

Said in the previous

ZAB is a protocol to ensure data consistency in the Zookeeper cluster, which involves the election process. FastLeaderElection is one of the algorithms used by Zookeeper to elect the Leader. These two concepts must be clearly understood, otherwise they will easily be confused.

Leader election

Two key periods:

  • Start the Zookeeper cluster
  • When the Leader crashes for crash recovery

Some basic concepts you need to know in advance:

1. Requirements for Leader election:

The leader node must have the highest ZXID. The leader node must be approved by more than half of the nodesCopy the code

2. Built-in election algorithm

LeaderElection FastLeaderElection (default) AuthFastLeaderElectionCopy the code

3. Election status

OBSERVING: indicates the OBSERVING state, OBSERVING: indicates the OBSERVING state, OBSERVING: indicates the OBSERVING state, OBSERVING: indicates the leader state, not participating in the voting. LEADING: indicates the leader stateCopy the code

4. Some nouns

Server ID - myID (sid) Transaction ID - maximum ZXID logical clock on the server - number of polling rounds initiatedCopy the code

The election process

Zookeeper requires the cluster to have an odd number of machines (to avoid splitting, more on that later), so let’s assume there are three servers. Next, the Leader election process of the three servers is introduced.

  • Each Server issues a vote. Since it is the initial situation, Server1 and Server2 will vote themselves as the Leader server. Each vote will contain the myID and ZXID of the selected server, represented by (myID, ZXID). At this time, Server1’s vote is (1, 0). Server2 votes for (2, 0), and each sends this vote to the other machines in the cluster.

PS: Don't know what voting for yourself means (don't know the data structure of the vote?) , don't hurry back to take you to see the source code!!

  • Accept votes from each server. After each server in the cluster receives the vote, it first determines the validity of the vote, such as checking whether it is the round vote and whether it is from a server in the LOOKING state.

  • Process the vote. For each vote, the server needs to compare others’ votes with its own. The rules for comparison are as follows:

Determine the ZXID first. The server with a larger ZXID (transaction ID) takes precedence as the Leader. If zxIDS are the same, then compare myid. Myid (server ID) The server with the larger myID acts as the Leader server.Copy the code

For Server1, its vote is (1, 0), and for Server2, its vote is (2, 0). First, the ZXID of both of them is 0, and then the myID of Server2 is compared. At this point, the myID of Server2 is the largest, so it updates its vote to (2, 0), and then votes again. For Server2, it does not have to update its own vote, but simply issues the last vote to all the machines in the cluster again.

PS: So update your vote to (2, 0)?

It means to update the voting information issued next time to (2, 0) and take the vote as the new voting basis.

  • Count the votes. After each vote, the server counts the voting information to determine whether more than half of the machines have received the same voting information. For Server1 and Server2, it counts that two machines in the cluster have received the voting information (2, 0), and the Leader is considered to have been elected.

  • Change the server state. Once the Leader is determined, each server updates its state, changing it to FOLLOWING if it is Follower or LEADING if it is Leader.


In a nutshell

1. Each service instance initiates a vote to elect itself as leader.

2. When other service instances receive the voting invitation, they compare whether the data transaction ID of the initiator is larger than their own latest transaction ID, vote for it if it is larger, do not vote for it if it is smaller, compare the server ID of the initiator if it is equal, and vote for it if it is larger.

3. After receiving the voting feedback from everyone, the initiator will check whether the number of votes (including its own votes) is greater than half of the cluster. If the number is greater than half, the initiator will become the leader.

Leader election algorithm

After understanding the election process, we introduce the implementation details of the algorithm in Zookeeper source code.

With the help of the algorithm description widely available on the Internet, I once again make an explanation for the difficult points involved, and the general process is as follows:

Voting for the first time. No matter which leads to the Leader election, all the machines in the cluster are in the state of trying to elect a Leader, namely the LOOKING state, and the LOOKING machine sends a message to all the other machines, which is called voting. The vote contains SID (unique identifier of the server) and ZXID (transaction ID), (SID, ZXID) to identify a vote.

Assume that Zookeeper consists of five machines, whose SIDs are 1, 2, 3, 4, and 5, and whose ZXids are 9, 9, 9, 8, and 8 respectively. The machine whose SID is 2 is the Leader machine. At a certain moment, the machine where 1 and 2 are located fails, so the cluster starts to elect the Leader machine. In the first vote, each machine will take itself as the voting object, so the machine whose SID is 3, 4, and 5 will vote (3, 9), (4, 8), and (5, 8) respectively.

At this time, the votes in the hands of the five machines are: server 1: (1,9) assume failure × server 2: (2,9) assume failure × server 3: (3,9) server 4: (4,8) server 5: (5,8)Copy the code

Change of vote. After each machine sends a vote, it will also receive the votes of other machines. Each machine will process the votes of other machines according to certain rules and decide whether to change its vote. This rule is also the core of the whole Leader election algorithm, and the terms are described as follows

Vote_sid: SID of the Leader server that was elected in the received vote. Vote_zxid: indicates the ZXID of the Leader server elected in the received vote. Self_sid: indicates the SID of the current server. Self_zxid: indicates the ZXID of the current server.Copy the code

Each time a vote is received, a comparison is performed between (vote_SID, vote_zxID) and (self_SID, self_zxID).

Rule 1: If vote_zxID is greater than self_zxID, the currently received vote is recognized and sent again. (The received transaction ID is greater than its current transaction ID)

Rule 2: If vote_zxID is less than self_zxID, stick to your vote and make no changes. (The received transaction ID is smaller than its current transaction ID)

Rule 3: If vote_zxID is equal to self_zxID, then the SIDs of the two are compared. If vote_SID is greater than self_SID, then the currently received vote is recognized and sent again. (Compare server ID and zxID with transaction ID)

Rule 4: If vote_zxID is equal to self_zxID and vote_SID is less than self_SID, stick to your vote and make no changes.

The specific process is shown as follows:

Determine the Leader. After the second round of voting, each machine in the cluster receives the votes of the other machines again and the votes are counted. If one machine receives more than half of the same votes, the SID machine corresponding to that vote becomes the Leader. At this point Server3 becomes the Leader.

Election process source code

Light said not practice false handle, understand the basic process of Leader election, and then explore the source code, no secret under the source code!

It might be quicker to pull the source code using my address. Git clone

Voting data structure

So let’s get that out of the way what is the structure of voting (or voting)?

public class Vote {...private final int version;/ / version number

    private final long id;// The SID of the elected Leader

    private final long zxid;// The proposed Leader transaction ID

    private final long electionEpoch;// Logical clock, used to determine whether multiple votes are in the same election cycle, each round increment by 1

    private final long peerEpoch;// The elected Leader epoch

    private final ServerState state;// The current server status. }// Server status
public enum ServerState {
        LOOKING,
        FOLLOWING,
        LEADING,
        OBSERVING
}
Copy the code

Now that we know what we’re voting for, let’s go through the whole algorithm.

The source entry

Zookeeper \ zookeeper – server \ SRC \ main \ Java \ org \ apache \ zookeeper \ server \ under the quorum

Non-core code I leave out, if you are interested in studying, you can view the source details according to my analysis process.

QuorumPeerMain.java

/**
 * To start the replicated server specify the configuration file name on
 * the command line.
 * @param args path to the configfile
 */
public static void main(String[] args) {
    QuorumPeerMain main = new QuorumPeerMain();
    try {
        main.initializeAndRun(args);/ / the entry
    } catch (IllegalArgumentException e) {
      ...
    }
    LOG.info("Exiting normally");
    ServiceUtils.requestSystemExit(ExitCode.EXECUTION_FINISHED.getValue());
}

protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException {
    QuorumPeerConfig config = new QuorumPeerConfig();
    if (args.length == 1) {
        config.parse(args[0]);
    }

    // Start and schedule the the purge task
    DatadirCleanupManager purgeMgr = new DatadirCleanupManager(
        config.getDataDir(),
        config.getDataLogDir(),
        config.getSnapRetainCount(),
        config.getPurgeInterval());
    purgeMgr.start();

    Standalone mode or cluster mode
    if (args.length == 1 && config.isDistributed()) {
        // Cluster mode
        runFromConfig(config);
    } else {
        LOG.warn("Either no config or no quorum defined in config, running in standalone mode");
        // there is only server in the quorum -- run as standaloneZooKeeperServerMain.main(args); }}public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException {
    try {
        ManagedUtil.registerLog4jMBeans();
    } catch (JMException e) {
        LOG.warn("Unable to register log4j JMX control", e);
    }

    LOG.info("Starting quorum peer");
    MetricsProvider metricsProvider;
    try {
        metricsProvider = MetricsProviderBootstrap.startMetricsProvider(
            config.getMetricsProviderClassName(),
            config.getMetricsProviderConfiguration());
    } catch (MetricsProviderLifeCycleException error) {
        throw new IOException("Cannot boot MetricsProvider " + config.getMetricsProviderClassName(), error);
    }
    try {
        ServerMetrics.metricsProviderInitialized(metricsProvider);
        ServerCnxnFactory cnxnFactory = null;
        ServerCnxnFactory secureCnxnFactory = null;

        // Provide the server and port 2181 for the client to read and write
        if(config.getClientPortAddress() ! =null) {
            cnxnFactory = ServerCnxnFactory.createFactory();
            cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);
        }

        if(config.getSecureClientPortAddress() ! =null) {
            secureCnxnFactory = ServerCnxnFactory.createFactory();
            secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), true); }...// Start the main thread
        quorumPeer.start();
        ZKAuditProvider.addZKStartStopAuditLog();
        quorumPeer.join();
    } catch (InterruptedException e) {
        // warn, but generally this is ok
        LOG.warn("Quorum Peer interrupted", e); }... }Copy the code

Call the start method of QuorumPeer

@Override
public synchronized void start(a) {
    if(! getView().containsKey(myid)) {throw new RuntimeException("My id " + myid + " not in the peer list");
    }
    // LoadDatabase mainly recovers data from local files and obtains the latest ZXID
    loadDataBase();
    startServerCnxnFactory();
    try {
        adminServer.start();
    } catch (AdminServerException e) {
        LOG.warn("Problem starting AdminServer", e);
        System.out.println(e);
    }
    // The election is initialized
    startLeaderElection();
    startJvmPauseMonitor();
    super.start(); }...public synchronized void startLeaderElection(a) {
    try {
        // If the current node state is LOOKING, vote for yourself
        if (getPeerState() == ServerState.LOOKING) {
            currentVote = newVote(myid, getLastLoggedZxid(), getCurrentEpoch()); }}catch (IOException e) {
        RuntimeException re = new RuntimeException(e.getMessage());
        re.setStackTrace(e.getStackTrace());
        throw re;
    }

    // Obtain the election algorithm according to the configuration can be configured in zoo. CFG, default is fast election
    this.electionAlg = createElectionAlgorithm(electionType); }...@SuppressWarnings("deprecation")
protected Election createElectionAlgorithm(int electionAlgorithm) {
    Election le = null;

    //TODO: use a factory rather than a switch
    switch (electionAlgorithm) {
    case 1:
        throw new UnsupportedOperationException("Election Algorithm 1 is not supported.");
    case 2:
        throw new UnsupportedOperationException("Election Algorithm 2 is not supported.");
    case 3:
        // The leader elects the network IO responsible class (responsible for the underlying network processing the messages in the receiving and sending queues)
        QuorumCnxManager qcm = createCnxnManager();
        QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
        if(oldQcm ! =null) {
            LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?) ");
            oldQcm.halt();
        }
        QuorumCnxManager.Listener listener = qcm.listener;
        if(listener ! =null) {
            // Start the bound election thread and wait for other machines in the cluster to connect
            listener.start();
            // Tcp-based election algorithm FastLeaderElection
            FastLeaderElection fle = new FastLeaderElection(this, qcm);
            fle.start();
            le = fle;
        } else {
            LOG.error("Null listener when initializing cnx manager");
        }
        break;
    default:
        assert false;
    }
    return le;
}

// FastLeaderElection fle = new FastLeaderElection(this, QCM); The constructor is called
public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager) {
   this.stop = false;
   this.manager = manager;
   starter(self, manager);
}
// Not much explanation at a glance
private void starter(QuorumPeer self, QuorumCnxManager manager) {
   this.self = self;
   proposedLeader = -1;
   proposedZxid = -1;

   sendqueue = new LinkedBlockingQueue<ToSend>();
   recvqueue = new LinkedBlockingQueue<Notification>();
   this.messenger = new Messenger(manager);
}

// The Start method of FastLeaderElection is called to build Messenger
//Starts instances of WorkerSender and WorkerReceiver
public void start(a) {
    this.messenger.start();
}
/**
* Constructor of class Messenger.
*
* @param manager   Connection manager
 */
 Messenger(QuorumCnxManager manager) {
  this.ws = new WorkerSender(manager);
  this.wsThread = new Thread(this.ws, "WorkerSender[myid=" + self.getId() + "]");
  this.wsThread.setDaemon(true);

  this.wr = new WorkerReceiver(manager);

  this.wrThread = new Thread(this.wr, "WorkerReceiver[myid=" + self.getId() + "]");
  this.wrThread.setDaemon(true);
 }
/** * Starts instances of WorkerSender and WorkerReceiver */
void start(a) {
  this.wsThread.start();
  this.wrThread.start(); }...// QuorumPeer's run method is called
@Override
public void run(a) {...try {
        /* * Main loop */
        while (running) {
            // Determine the current node status
            switch (getPeerState()) {
            case LOOKING:
                // If LOOKING, enter the election process
                LOG.info("LOOKING");
                ServerMetrics.getMetrics().LOOKING_COUNT.add(1);

                if (Boolean.getBoolean("readonlymode.enabled")) {
                    LOG.info("Attempting to start ReadOnlyZooKeeperServer");

                    // Create read-only server but don't start it immediately
                    final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(logFactory, this.this.zkDb);

                    // Instead of starting roZk immediately, wait some grace
                    // period before we decide we're partitioned.
                    //
                    // Thread is used here because otherwise it would require
                    // changes in each of election strategy classes which is
                    // unnecessary code coupling.
                    Thread roZkMgr = new Thread() {
                        public void run(a) {
                            try {
                                // lower-bound grace period to 2 secs
                                sleep(Math.max(2000, tickTime));
                                if(ServerState.LOOKING.equals(getPeerState())) { roZk.startup(); }}catch (InterruptedException e) {
                                LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
                            } catch (Exception e) {
                                LOG.error("FAILED to start ReadOnlyZooKeeperServer", e); }}};try {
                        roZkMgr.start();
                        reconfigFlagClear();
                        if (shuttingDownLE) {
                            shuttingDownLE = false;
                            startLeaderElection();
                        }
                        // Here we use the policy mode to determine which election algorithm is currently used for the leadership election
                        setCurrentVote(makeLEStrategy().lookForLeader());
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception", e);
                        setPeerState(ServerState.LOOKING);
                    } finally {
                        // If the thread is in the the grace period, interrupt
                        // to come out of waiting.roZkMgr.interrupt(); roZk.shutdown(); }}else {
                    try {
                        reconfigFlagClear();
                        if (shuttingDownLE) {
                            shuttingDownLE = false;
                            startLeaderElection();
                        }
                        // Here the policy pattern determines which election algorithm is currently used for the leadership election
                        setCurrentVote(makeLEStrategy().lookForLeader());
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception", e); setPeerState(ServerState.LOOKING); }}break;
            case OBSERVING:
                ...
                break;
            case FOLLOWING:
                ...
                break;
            case LEADING:
                ...
                break; }}}finally{... }}Copy the code

Implement the core election algorithm

SetCurrentVote (makeLEStrategy().lookForLeader());
    public Vote lookForLeader(a) throws InterruptedException {
        try {
            self.jmxLeaderElectionBean = new LeaderElectionBean();
            MBeanRegistry.getInstance().register(self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
        } catch (Exception e) {
            LOG.warn("Failed to register with JMX", e);
            self.jmxLeaderElectionBean = null;
        }

        self.start_fle = Time.currentElapsedTime();
        try {
            /* * The votes from the current leader election are stored in recvset. In other words, a vote v is in recvset * if v.electionEpoch == logicalclock. The current participant uses recvset to deduce on whether a  majority * of participants has voted for it. */
            // Save the votes received
            Map<Long, Vote> recvset = new HashMap<Long, Vote>();

            /* * The votes from previous leader elections, as well as the votes from the current leader election are * stored in outofelection. Note that notifications in a LOOKING state are not stored in outofelection. * Only FOLLOWING or LEADING notifications are stored in outofelection. The current participant could use * outofelection to learn which participant is the leader if it arrives late (i.e., higher logicalclock than * the electionEpoch of the received notifications) in a leader election. */
            // Store election results
            Map<Long, Vote> outofelection = new HashMap<Long, Vote>();

            int notTimeout = minNotificationInterval;

            synchronized (this) {
                // Add logical clock +1 atomic operation
                logicalclock.incrementAndGet();
                // Update your zxID and epoch
                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
            }

            LOG.info(
                "New election. My id = {}, proposed zxid=0x{}",
                self.getId(),
                Long.toHexString(proposedZxid));
            // Sending a vote includes sending it to yourself (broadcast)
            sendNotifications();

            SyncedLearnerTracker voteSet;

            /* * Loop in which we exchange notifications until we find a leader */

            // Do the while loop until the leader is elected
            while((self.getPeerState() == ServerState.LOOKING) && (! stop)) {/* * Remove next notification from queue, times out after 2 times * the termination time */
                // Get the vote information from the receiving IO thread
                Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);

                /* * Sends more notifications if haven't received enough. * Otherwise processes new notification. */
                // If the message is empty, continue sending until the leader is elected
                if (n == null) {
                    if (manager.haveDelivered()) {
                        sendNotifications();
                    } else {
                        // The message has not been delivered yet
                        manager.connectAll();
                    }

                    /* * Exponential backoff */
                    // Extend the timeout
                    int tmpTimeOut = notTimeout * 2;
                    notTimeout = Math.min(tmpTimeOut, maxNotificationInterval);
                    LOG.info("Notification time out: {}", notTimeout);
                    // The received vote message determines whether the received message belongs to the cluster
                } else if (validVoter(n.sid) && validVoter(n.leader)) {
                    /* * Only proceed if the vote comes from a replica in the current or next * voting view for a replica in the current or next voting view. */
                    // Determine the status of the node receiving the message
                    switch (n.state) {
                    case LOOKING:
                        if (getInitLastLoggedZxid() == -1) {
                            LOG.debug("Ignoring notification as our zxid is -1");
                            break;
                        }
                        if (n.zxid == -1) {
                            LOG.debug("Ignoring notification from member with -1 zxid {}", n.sid);
                            break;
                        }
                        // If notification > current, replace and send messages out
                        // If the received node epoch is greater than logicalClock, the current election is a new one
                        if (n.electionEpoch > logicalclock.get()) {
                            // Update local logicalClock
                            logicalclock.set(n.electionEpoch);
                            // Clear the receive queue
                            recvset.clear();
                            // Check whether the received message can win by comparing the epoch ZXID myID
                            if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                                // Change the vote to the other party's bill
                                updateProposal(n.leader, n.zxid, n.peerEpoch);
                            } else {
                                // Otherwise, the ticket remains unchanged
                                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
                            }
                            // Continue broadcasting to let other nodes know my current ticket
                            sendNotifications();
                            // If the received message epoch is smaller than the epoch of the current node, the message is ignored
                        } else if (n.electionEpoch < logicalclock.get()) {
                                LOG.debug(
                                    "Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x{}, logicalclock=0x{}",
                                    Long.toHexString(n.electionEpoch),
                                    Long.toHexString(logicalclock.get()));
                            break;
                            // if the epoch is the same, continue to compare zxID and myid. If the epoch is the same, update your own ticket and broadcast
                        } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                            sendNotifications();
                        }

                        LOG.debug(
                            "Adding vote: from={}, proposed leader={}, proposed zxid=0x{}, proposed election epoch=0x{}",
                            n.sid,
                            n.leader,
                            Long.toHexString(n.zxid),
                            Long.toHexString(n.electionEpoch));

                        // don't care about the version if it's in LOOKING state
                        // Add it to the local voting set for election termination judgment
                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

                        voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));

                        // The default algorithm for determining whether the election is over is that more than half of the servers agree
                        if (voteSet.hasAllQuorums()) {

                            // Verify if there is any change in the proposed leader
                            // Wait until a new notification arrives until timeout
                            while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) ! =null) {
                                if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
                                    recvqueue.put(n);
                                    break; }}/* * This predicate is true once we don't read any new * relevant message from the reception queue */
                            / / determine the leader
                            if (n == null) {
                                // Change the status
                                setPeerState(proposedLeader, voteSet);
                                Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
                                leaveInstance(endVote);
                                returnendVote; }}break;
                        //OBSERVING doesn't take part in elections
                    case OBSERVING:
                        LOG.debug("Notification from observer: {}", n.sid);
                        break;
                        // These two need to participate in the election
                    case FOLLOWING:
                    case LEADING:
                        /* * Consider all notifications from the same epoch * together. */
                        // Check whether the epoch is the same
                        if (n.electionEpoch == logicalclock.get()) {
                            // Join the local voting set if the same
                            recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
                            voteSet = getVoteTracker(recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
                            // Check whether the leader ends. If the leader ends, check whether the leader is valid
                            if (voteSet.hasAllQuorums() && checkLeader(recvset, n.leader, n.electionEpoch)) {
                                // Modify your status and return the result of the vote
                                setPeerState(n.leader, voteSet);
                                Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
                                leaveInstance(endVote);
                                returnendVote; }}/* * Before joining an established ensemble, verify that * a majority are following the same leader. * * Note that the outofelection map also stores votes from the current leader election. * See ZOOKEEPER-1732 for more information. */
                        outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
                        voteSet = getVoteTracker(outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));

                        if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) {
                            synchronized (this) {
                                logicalclock.set(n.electionEpoch);
                                setPeerState(n.leader, voteSet);
                            }
                            Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                        break;
                    default:
                        LOG.warn("Notification state unrecoginized: {} (n.state), {}(n.sid)", n.state, n.sid);
                        break; }}else {
                    if(! validVoter(n.leader)) { LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
                    }
                    if(! validVoter(n.sid)) { LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid); }}}return null;
        } finally {
            try {
                if(self.jmxLeaderElectionBean ! =null) { MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean); }}catch (Exception e) {
                LOG.warn("Failed to unregister with JMX", e);
            }
            self.jmxLeaderElectionBean = null;
            LOG.debug("Number of connection processing threads: {}", manager.getConnectionThreadCount()); }}/** * Send notifications to all peers upon a change in our vote */
    /** * broadcast message */
    private void sendNotifications(a) {
        // Send in a loop
        for (long sid : self.getCurrentAndNextConfigVoters()) {
            QuorumVerifier qv = self.getQuorumVerifier();
            // Message entity
            ToSend notmsg = new ToSend(
                ToSend.mType.notification,
                proposedLeader,
                logicalclock.get(),
                QuorumPeer.ServerState.LOOKING,
                sid,
                proposedEpoch,
                qv.toString().getBytes());

            LOG.debug(
                "Sending Notification: {} (n.leader), 0x{} (n.zxid), 0x{} (n.round), {} (recipient),"
                    + " {} (myid), 0x{} (n.peerEpoch) ",
                proposedLeader,
                Long.toHexString(proposedZxid),
                Long.toHexString(logicalclock.get()),
                sid,
                self.getId(),
                Long.toHexString(proposedEpoch));

            // Add to the send queue which will be consumed by the WorkerSendersendqueue.offer(notmsg); }}Copy the code

With the help of a web image, it is clear that the flow of data involved in the election process is being described.

There is a network IO manager involved: responsible for maintaining both sending and receiving threads. And voting algorithms consume and produce voting messages from queues. Finally, the core ballot PK is executed, updated and discarded according to certain strategies, until a Leader is elected.

conclusion

In order to understand the Leader election process clearly, several important concepts and terms should be clear.

  • The concepts of transaction ID and Zxid should be clear
  • Indicates the sequence and comparison policy of Zxid and Sid
  • How to update your ballot and broadcast your own ballot

OK! Zookeeper’s Leader election process is designed to synchronize data between the Zookeeper and the Zookeeper Leader. The ZK Leader election process is designed to synchronize data between the Zookeeper and the Zookeeper Leader. Welcome to pay attention to the public number: “Java programming way”! 🌹