zab

The core of Zookeeper is the Zab protocol: Zookeeper Atomic Broadcast, which ensures synchronization between servers

Zab protocol has two modes: recovery mode (master selection) and broadcast mode (synchronization).

Zab goes into recovery mode when the service starts or after the leader crashes, and the recovery mode ends when the leader is elected and most servers have completed state synchronization with the Leader. State synchronization ensures that the leader and Server have the same system state.

Choose the main

Leader election trigger condition

Leader election is the key to ensure the consistency of distributed data. If one of the following conditions occurs on a server in the Zookeeper cluster, you need to enter the Leader election.

(1) The server is initialized and started.

If the Leader election is to be held, at least two servers are required. Here, a server cluster consisting of three servers is used as an example. In the cluster initialization stage, when one server, Server1, is started, it cannot conduct and complete the Leader election alone. When the second server, Server2, is started, the two machines can communicate with each other, and each machine tries to find the Leader, thus entering the Leader election process.

(2) The server cannot maintain connection with the Leader during operation.

During the running of Zookeeper, the Leader and non-leader servers perform their respective functions. Even if a non-leader server breaks down or joins in, the Leader will not be affected. However, once the Leader server dies, the whole cluster will suspend external services and enter a new round of Leader election. The process is basically the same as the Leader election process in the start-up period. Assume that three servers, Server1, Server2, and Server3, are running. The current Leader is Server2. If the Leader fails at a certain moment, the Leader election starts.

Election algorithm

Zookeeper versions after 3.4.0 only retain the TCP version of the FastLeaderElection algorithm.

1. Server status

The server has four states, which are LOOKING, FOLLOWING, LEADING, and OBSERVING.

  • LOOKING: Searches for the Leader status. When the server is in this state, it considers that there is no Leader in the cluster and therefore needs to enter the Leader election state.
  • FOLLOWING: status of the follower. The current server role is Follower.
  • LEADING: Indicates the leader status. Indicates that the current server role is Leader.
  • OBSERVING: Indicates the status of the observer. Indicates that the current server role is Observer.

2. Voting data structure

Each Vote contains two basic information: the SID and ZXID of the proposed server. The Vote contains the following fields in Zookeeper

Id: SID of the elected Leader. Zxid: indicates the ID of the Leader transaction to be elected. ElectionEpoch: a logical clock used to determine whether multiple votes are in the same election cycle. This value is an increment sequence on the server side and increments by one after each new round of voting. PeerEpoch: the elected Leader epoch. State: indicates the status of the current server.Copy the code

3. QuorumCnxManager

During the startup process of each server, a QuorumPeerManager will be started, which is responsible for the network communication between each server during the underlying Leader election process.

  1. Message queues. QuorumCnxManager maintains a series of queues, which are used to store the received and sent messages and the sender of the message. Except for the receiving queue, the other queues are grouped into a set of queues according to their SID. If a cluster has three machines in addition to itself, a send queue is created for each of the three machines. Don’t interfere.
  • RecvQueue: Message receiving queue for messages received from other servers.
  • QueueSendMap: Message sending queue that holds messages to be sent, grouped by SID.
  • SenderWorkerMap: set of senderworkers. Each SenderWorker message sender corresponds to a remote Zookeeper server, which is responsible for sending messages and is also grouped by SID.
  • LastMessageSent: The most recently sent message, keeping the most recently sent message for each SID.
  1. Establish a connection. In order to be able to vote with each other, all the machines in the Zookeeper cluster need to have a network connection set up in pairs. QuorumCnxManager creates a ServerSocket on startup to listen for the communication port for the Leader election (default: 3888). After listening is enabled, Zookeeper continuously receives connection creation requests from other servers and processes TCP connection requests from other servers. To avoid repeated TCP connections between two machines, Zookeeper allows only servers with large SID to establish connections with other machines. Otherwise, the connection is disconnected. After receiving a connection creation request, the server compares its SID value with that of the remote server to determine whether to accept the connection request. If the current server finds that its SID value is larger than that of the remote server, it disconnects the current connection and establishes a connection with the remote server. Once the connection is established, the corresponding message sender, SendWorker, and message receiver, RecvWorker, are created and started based on the SID of the remote server.

  2. Message receiving and sending. Message receiving: The RecvWorker is responsible for the message receiver. Since Zookeeper assigns a separate RecvWorker to each remote server, each RecvWorker only needs to continuously read messages from this TCP connection and save them to the recvQueue queue. Message sending: Since Zookeeper assigns a separate SendWorker to each remote server, each SendWorker only needs to continuously get a message to send from the corresponding message sending queue and put the message into lastMessageSent. In SendWorker, once Zookeeper finds that the message sending queue for the current server is empty, it needs to take out a recently sent message from lastMessageSent to send it again. This is to solve the problem that the server hangs before or after receiving the message. The message is not processed correctly. Zookeeper also ensures that the recipient processes duplicate messages correctly.

4. FastLeaderElection

The term
  • External votes: votes sent by other servers.
  • Internal vote: the current vote of the server itself.
  • Election cycle: specifies the election cycle of the Zookeeper server Leader, that is, logicalclock.
  • PK: Compare internal voting with external voting to determine if internal voting needs to be changed.
Vote management
  • Sendqueue: queue for sending votes to be sent.
  • Recvqueue: Queue for receiving votes, used to store external votes received.
  • WorkerReceiver: ballot receiver. It will continuously get election messages from other servers from QuorumCnxManager, convert them into a vote, and save them to recvQueue. In the process of receiving votes, if the number of election rounds of the external vote is found to be smaller than that of the current server, the external vote will be ignored. And immediately send your own internal poll.
  • WorkerSender: Ballot sender that continuously gets the votes to be sent from sendQueue and passes them to the underlying QuorumCnxManager.

  

Algorithm is the core

The figure above shows how the FastLeaderElection module interacts with the underlying network I/O. The basic process of Leader election is as follows

1. Additional electoral rounds. Zookeeper specifies that all valid votes must be in the same round. When a new round of voting starts, the logicalClock is incrementing first.

2. Initialize the ballot. Before starting a new round of voting, each server initializes its own vote, and during the initialization phase, each server elects itself as the Leader.

3. Send the initial ballot. Once the ballot is initialized, the server initiates the first vote. Zookeeper puts the newly initialized vote into sendQueue, which is sent out by the WorkerSender.

4. Accept outside votes. Each server continuously fetches external votes from the RecvQueue queue. If the server finds that it can’t get any external votes, it immediately confirms that it has a valid connection to the other servers in the cluster, establishes a connection if it doesn’t, and sends its current internal vote again if it does.

5. Determine the election cycle. After the initial vote is sent, the external vote is processed. External votes are processed differently depending on the election cycle.

  • There were more electoral rounds of external voting than internal voting. If the server’s own polling cycle is behind that of the external polling server, it immediately updates its own polling cycle (logicalclock), empties all votes that have been received, and then uses the initial polling to PK to determine whether to change the internal polling. Finally, the internal vote is sent out.
  • External voting has fewer electoral rounds than internal voting. If the external vote received by the server is behind its own election cycle, Zookeeper ignores the external vote and returns to Step 4.
  • The electoral rounds of external voting are equal to internal voting. At this point, the ballot PK can begin.

6. Votes PK. When voting PK, meeting any one of the conditions requires a change of vote.

  • If the Leader server elected in the external vote has more election cycles than the internal vote, the vote needs to be changed.
  • If the election cycle is the same, then the ZXID of the two elections will be compared. If the ZXID of the external vote is large, then the vote needs to be changed.
  • If the ZXids of the two polls are the same, then the SIDs of the two polls are compared. If the SIDs of the external polls are large, then the polls need to be changed.

7. Change of vote. After PK, if it is determined that the external voting is better than the internal voting, the voting will be changed, that is, the voting information of the external voting will be used to cover the internal voting. After the change is completed, the changed internal voting will be sent out again.

8. Ballot filing. Regardless of whether the vote was changed, the external vote that was just received is archived in the vote collection RecvSet. Recvset is used to record all external votes received by the current server in this cycle of Leader elections (distinguished by the SID of the service queue, such as {(1, vote1), (2, vote2)… }).

9. Count the votes. After the ballot is archived, the vote can be counted to determine whether more than half of the servers in the cluster have recognized the current internal vote. If it is confirmed that more than half of the servers have recognized the vote, the vote will be terminated. Otherwise, return to Step 4.

10. Update the server status. If it is confirmed that the vote can be terminated, then the server status is updated. The server preferentially determines whether the Leader server corresponding to the vote currently approved by more than half of the servers is its own. If it is its own, the server status is updated to LEADING. We can decide whether we are FOLLOWING or OBSERVING based on specific circumstances.

The above 10 steps are the core of FastLeaderElection. Steps 4-9 will go through several cycles until a Leader is elected.

synchronous

After the whole cluster completes the Leader election, the Learner will register with the Leader. When the Learner registers with the Leader, it enters the data synchronization process. The synchronization process is that the Leader synchronizes the transaction requests that have not been submitted on the Learner server to the Learner server. The general process is as follows

(1) Get the Learner state. At the last stage of registering a Learner, the Learner server will send an ACKEPOCH packet to the Leader server, and the Leader will parse out the Learner’s currentEpoch and lastZxid from the packet.

(2) Data synchronization initialization. Firstly, the proposals cache queue corresponding to transaction requests are extracted from the Zookeeper memory database. At the same time, complete peerLastZxid(the ZXID last processed by the Learner), minCommittedLog(the smallest ZXID in the commitedLog of the Leader’s proposed cache queue) and maxCommittedLog(commite of the Leader’s proposed cache queue) Maximum ZXID in dLog) initialization of three ZXID values.

For cluster data synchronization, there are four types: DIFF synchronization, TRUNC+DIFF synchronization, TRUNC synchronization only, and SNAP synchronization. During the initialization phase, the Leader preferentially synchronizes data in full synchronization mode. At the same time, the final data synchronization method is determined according to the data difference between the Leader and Learner.

  • Direct differential synchronization (DIFF synchronization, peerLastZxid between minCommittedLog and maxCommittedLog). The Leader sends a DIFF command to the Learner first, which is used to inform the Learner to enter the differentiated data synchronization stage. The Leader is about to synchronize some proposals to himself. The Leader will complete this by sending the PROPOSAL content packet and the COMMIT instruction packet.

  • Rollback then differential synchronization (TRUNC+DIFF synchronization, where the Leader has logged the transaction to the local transaction log but has not successfully initiated the Proposal process). When the Leader finds that a certain Learner contains a transaction record that he does not have, the Learner needs to roll back the transaction to the ZXID closest to peerLastZxid that exists on the Leader server.

  • Rollback synchronization only (TRUNC synchronization, peerLastZxid greater than maxCommittedLog). The Leader asks Learner to roll back to the transaction whose ZXID value is maxCommittedLog.

  • Full synchronization (SNAP synchronization, peerLastZxid is less than minCommittedLog or peerLastZxid is different from lastProcessedZxid). The Leader cannot synchronize with Learner directly using the proposal cache queue, so only full synchronization can be performed. The Leader synchronizes the host’s full memory data to the Learner. The Leader first sends a SNAP command to Learner to inform him that full synchronization is about to take place. Then, the Leader obtains the full data nodes and session timeout recorder from the in-memory database, serializes them and transmits them to Learner. After Learner receives the full data, it is deserialized and loaded into the in-memory database.

Read and write requests

Each server in the ZooKeeper cluster can provide read and write services for clients.

Read requests

For a read request from the client, the server directly retrieves data from its local in-memory database and returns it to the client. This process does not involve any other operations and does not contact the leader.

Write requests

Write requests from clients must be coordinated in the ZooKeeper cluster because the write operations modify zNode data and status. The processing process is as follows:

  1. The server that receives the write request first sends the write request to the leader.

  2. After receiving a write request from a follower(or observer), the leader calculates the status of the write operation and converts the write request into a transaction with various states (such as version number, ZXID, and so on).

  3. The leader broadcasts the transaction as a proposal (sending a proposal).

  4. After receiving the proposal, all followers vote on the proposal and return an ACK to the leader. There are only two ways for followers to vote :(1) confirm the proposal and show their approval; (2) Discard the proposal to express disapproval.

  5. The leader collects the vote results, and as long as the number of votes meets the requirement of a majority (for example, in a cluster of five nodes, three or more nodes are considered a majority), the proposal passes.

  6. After the proposal is approved, the leader sends a commit notification to all servers.

  7. All nodes write the transaction to the transaction log and commit.

  8. After the commit, the server receiving the write request returns a success message to the client.

observer

When the number of followers in a ZooKeeper cluster is large, the voting process becomes a performance bottleneck. To solve the pressure caused by voting, the Observer role is created.

  • The Observer role does not participate in the vote, but simply “listens” to the results of the vote.
  • The Observer does not need to persist transactions to disk, and once the Observer is restarted, the entire namespace needs to be resynchronized from the Leader.
  • Other than that, it behaves exactly like followers, accepting read and write requests, for example.

This feature greatly improves the performance of the ZooKeeper cluster.

Like followers, when the Observer receives a read request from the client, it returns data from the in-memory database to the client.

When a write request is sent to a server, it is sent to the leader regardless of whether the node is a follower or observer. The leader then organizes the voting process, and all servers receive the proposal(including the Observer, because the proposal is broadcast), but the leader, followers, and the Observer pass the configuration file, They all know if they are observers and who they are. Servers that themselves are Observers do not vote. When the leader has collected the votes, it removes the observer servers and counts the majority of the remaining servers. If the majority of votes is reached, the transaction is written successfully, and the leader notifies all nodes (including the Observers) to write the transaction to the transaction log and commit it.

advantages

The Observer role brings several additional advantages in addition to reducing the stress of voting.

1. Improved scalability.

Scalability refers to increasing the ability of the entire cluster to handle requests by adding servers to load them. “One cow can’t pull any more, get more.”

Before the advent of the Observer, the scalability of a ZooKeeper cluster was achieved by followers. Although followers are “stateless” for read and write operations, this makes it easy to add new followers to the cluster (or to remove followers from the cluster), increasing the load on the ZooKeeper cluster. However, for voting, the number of followers is stateless. Increasing or decreasing the number of followers will directly affect the voting result. In particular, the more followers there are, the worse the performance of the voting process will be.

However, both read and write requests and votes of observers are stateless. Increasing or decreasing the number of Observers does not affect the voting result. This allows one part of the server to participate in the voting as a follower and another part to serve purely as an observer for reading and writing. This makes ZooKeeper much more scalable.

2. Deploy a cross-region ZooKeeper data center more conveniently.

The Observer can respond to read requests by fetching data directly from a local in-memory database, thus improving read throughput. For a write request, although it sends to the leader and receives notification from the leader, the amount of data is small compared to the information passed during the voting process, so it can perform well even over a wan.

In fact, many cross-room and cross-regional data centers are implemented through the Observer.