Zookeeper uses the ZAB consistency protocol to achieve the ultimate consistency of distributed transactions.

This section describes ZAB protocol

ZAB: Zookeeper Atomic Broadcast

ZAB protocol is a crash recovery consistency protocol specially designed for distributed coordination service ZooKeeper. Based on this protocol, ZooKeeper implements a master-slave system architecture to maintain data consistency between replicas in the cluster.

ZAB’s message broadcast process uses an atomic broadcast protocol, similar to a two-phase commit. In response to client requests, the Leader server generates corresponding transaction proposals and sends them to all Follower servers in the cluster. Then the respective votes are collected, and finally the transaction is committed. As shown in figure:

Two-phase commit in ZAB protocol, which removes interrupt logic. All the Follower servers either respond to the Leader’s transaction proposals normally or discard the Leader server. Meanwhile, we can start submitting transaction proposals after more than half of the Follower servers have responded with acks.

The Leader server assigns a globally monotonically increasing ID to the transaction proposal, called the transaction ID (ZXID). Because the ZAB protocol requires strict causality for each message, each transaction proposal needs to be processed in order of its ZXID.

During message broadcast, the Leader server assigns a queue to each Follower server, then puts transaction proposals into these queues in turn, and sends messages according to the FIFO policy.

After receiving the proposed transaction, the Follower server writes the proposed transaction to the local disk as a transaction log. After the proposed transaction is successfully written, the Follower server sends a feedback to the Leader server for ACK.

When the Leader server receives an ACK from more than half of the followers, it sends a COMMIT message. At the same time, the Leader commits the transaction. The followers server also commits the transaction after receiving the COMMIT message.

Atomic broadcast protocol is used to ensure the consistency of distributed data. More than half of the nodes have consistent data storage.

News broadcast

You can think of the message broadcast mechanism as a simplified version of the 2PC protocol, which ensures sequential transaction consistency through the following mechanism.

When the client submits a transaction request, the Leader node generates a transaction Proposal for each request and sends it to all the followers nodes in the cluster. After receiving feedback from half of the followers, the Leader node starts to submit the transaction. ZAB protocol uses the atomic broadcast protocol. In ZAB protocol, only half of the followers nodes need to receive Ack feedback before the transaction can be submitted. This also leads to data inconsistency after the Leader node crashes. ZAB uses crash recovery to deal with the problem of inconsistency. Message broadcast uses TCP protocol to communicate so that the sequence of receiving and sending transactions is guaranteed. When a message is broadcast, the Leader node assigns a globally increasing ZXID (transaction ID) to each transaction Proposal. Each transaction Proposal is processed in the order of ZXID.

The Leader node assigns a queue to each Follower node and puts them into the queue in the order of transaction ID, and sends transactions according to the rule FIFO of the queue. After receiving the Proposal from the followers node, the Follower node writes the Proposal to the local disk as a transaction log and sends an Ack message to the Leader node. The Leader submits the transaction after receiving half of the Ack feedback from the followers node. The Follower nodes broadcast a Commit message to all the followers at the same time. After receiving the Commit, the followers start to Commit the transaction.

Crash recovery

When the Leader crashes during message broadcast, can data be consistent? When the Leader crashes, it enters crash recovery mode. In fact, it is mainly the treatment of the following two cases.

  1. The Leader crashes after copying data to all Follwers.
  2. The Leader crashes after receiving an Ack and committing itself and sending part of the COMMIT.

To address this problem, ZAB defines two principles:

  1. ZAB agreement to ensureperformTransactions that have been committed by the Leader will eventually be committed by all servers.
  2. ZAB agreement to ensurediscardedTransactions that are only proposed/replicated by the Leader, but not committed.

How to ensure that transactions that have already been committed by the Leader are committed, while discarding transactions that have already been skipped? The core is processed by ZXID. During recovery after a crash, the largest ZXID is selected as the snapshot for recovery. This has the advantage of eliminating transaction commit checking and transaction discarding for efficiency

Data synchronization

After the Leader election, the Leader server will confirm whether all transaction proposals (i.e., submitted transaction proposals) in the transaction log have been submitted by more than half of the machines, that is, whether data synchronization has been completed, before the official work starts. The following is the data synchronization process of ZAB protocol.

The Leader server prepares a queue for each Follower server, sends each transaction that has not been synchronized by the Follower server to the Follower server one by one in the form of a transaction proposal, and sends a COMMIT message after each proposal message to indicate that the transaction has been committed.

After the Follower server has synchronized all its unsynchronized transaction proposals from the Leader server and applied them to the local database, the Leader server adds the Follower server to the actual list of available followers.

The design of the ZXID

ZXID is a 64-bit number, as shown in the following figure.

The lower 32 bits are a simple monotonically increasing counter that is incremented by +1 whenever the Leader server initiates a new transaction proposal.

High 32 bits to distinguish between different Leader servers. Specifically, every time a new Leader server is elected, a maximum ZXID will be extracted from the local log of the Leader server to generate the corresponding epoch value, and then the increment operation will be performed, and then the value will be used as the new epoch. And the lower 32 bits start at 0 to generate the ZXID. (I understand that the epoch here represents the symbol of a Leader server. Every time the Leader server is elected, the epoch value will be updated, indicating that the new Leader server will handle transaction requests during this period.)

ZAB uses the EPOCH number to distinguish Leader cycle changes, effectively preventing different Leader servers from using the same ZXID.

Here is my Leader node zxID generation core code you can see.

// Leader.java

void lead(a) throws IOException, InterruptedException {
	/ /...
  long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
  zk.setZxid(ZxidUtils.makeZxid(epoch, 0));
  / /...
}

//
public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException {
  synchronized (connectingFollowers) {
    / /...
    if (isParticipant(sid)) {
      // Add yourself to the join queue to determine if the lead is valid
      connectingFollowers.add(sid);
    }
    QuorumVerifier verifier = self.getQuorumVerifier();
    // If enough followers enter, the election is valid and there is no need to wait and pass through other waiting threads, similar to the Barrier
    if (connectingFollowers.contains(self.getId()) && verifier.containsQuorum(connectingFollowers)) {
      waitingForNewEpoch = false;
      self.setAcceptedEpoch(epoch);
      connectingFollowers.notifyAll();
    } else {
      / /...
      // If followers are not enough, they will wait and the timeout is initLimit
      while(waitingForNewEpoch && cur < end && ! quitWaitForEpoch) { connectingFollowers.wait(end - cur); cur = Time.currentElapsedTime(); }// Exit due to timeout
      if (waitingForNewEpoch) {
        throw new InterruptedException("Timeout while waiting for epoch from quorum"); }}returnepoch; }}// ZxidUtils
public static long makeZxid(long epoch, long counter) {
  return (epoch << 32L) | (counter & 0xffffffffL);
}
Copy the code

ZAB protocol implementation

The process of writing data

The resources

  • The Zookeeper’s official website

  • ZooKeeper consistency protocol works as ZAB