Zab: ZooKeeper Atomic Broadcast Protocol.
As the core protocol of ZK, it runs through zK from beginning to end.
We know that zK does several things from startup to death:
- Elected leader
- Found that the server
- Synchronous transaction
- broadcasting
- Collapse re-election
Zab is the guarantor of these things, it makes everything go the way we want it to. Now let’s see how ZK does it.
Election
As mentioned earlier, ZK uses FLE for elections and is currently the only election algorithm supported by ZK. I won’t repeat it here.
Other stages are interpreted by the roles of Leader and Follower.
Leader
1. Discovery
QuorumPeer = run (); QuorumPeer = run (); Case will do different things for different states of the serverState.
We looked at LOOKING’s case because this is the entrance to Election.
Now after the election, serverState will switch to another state, and zabState will switch as well
Let’s look at the run method again (intercept) :
/ / source reference: org. Apache. Zookeeper. Server. Quorum. QuorumPeer# run
case LEADING:
try {
setLeader(makeLeader(logFactory));
// There is a while in this method, if it returns,
// Indicates that the leadership of the server ends and the next election is held
leader.lead();
setLeader(null);
}catch (Exception e) {
LOG.warn("Unexpected exception", e);
} finally {
if(leader ! =null) {
leader.shutdown("Forcing shutdown");
setLeader(null);
}
// This will reset serverState to LOOKING
// To initiate the election again throughout the while
updateServerState();
}
break;
}
Copy the code
As you can see from the above, after being selected as the leader, all mysteries are in the leader.lead() method. Now go and see what this guy has done behind our backs!!
I looked at this method, from 576 lines to 874 lines, a total of 874-576 = 298 lines, we usually write code like this, estimate will be killed, and then the leaders said, you this method can write so many lines!! Aha ha ha ha: Joy:
The whole stage requires comparison:
org.apache.zookeeper.server.quorum.LearnerHandler#run
Copy the code
The LearnerHandler, which assigns a connection to each follower, is responsible for communicating with the server throughout its lifetime.
Also note the synchronization of the Leader and LearnerHandler at several key nodes:
-
GetEpochToPropose: The handler retrieves more than half of the Server epoch and generates the leader epoch at a time
-
WaitForEpochAck: The handler retrieves more than half of the Server ACKS for synchronization
-
WaitForNewLeaderAck: Waiting for the result of server synchronization. The entire synchronization process is handled by the handler.
It is through these methods, although it is asynchronous, but finally at the key nodes, will wait for their respective completion of their own things!!
self.setZabState(QuorumPeer.ZabState.DISCOVERY);
Copy the code
As you can see, this method starts with zabState set to DISCOVERY, that is, zK officially goes from the election phase to the DISCOVERY phase:
self.setZabState(QuorumPeer.ZabState.DISCOVERY);
// Add the epoch and zxID to StatueSummary for comparison
leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());
// Start thread to accept followers connection
cnxAcceptor = new LearnerCnxAcceptor();
cnxAcceptor.start();
// This method is used to retrieve the epoch, and +1 is the epoch of the new leader. - [1]
long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
// after zk fetched the largest epoch of any connection, +1 was set to zero 32 bits lower to the new ZXID
zk.setZxid(ZxidUtils.makeZxid(epoch, 0));
// ...
// Wait for the epoch ACK of more than half of the servers to confirm that the new epoch has been sent to more than half of the servers
// This is handled the same way as above to get the maximum epoch
// ZK thinks the election is over after enough epoch ack
waitForEpochAck(self.getId(), leaderStateSummary);
self.setCurrentEpoch(epoch);
self.setLeaderAddressAndId(self.getQuorumAddress(), self.getId());
self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
Copy the code
At this point, the Discovery phase is over. Let’s summarize what the leader does in the Discovery stage:
- Get the latest epoch
- Quorum Packet (QP) of type FOLLOWERINFO or OBSERVERINFO received
- Count and get the largest epoch, and set the new epoch
- Regress the LEADERINFO QP with the new epoch to the current server
- Get more than half of the server connections (waitForEpochAck) :
- Handler waits for a response from the follower/ Observer ACKEPOCH type. More than half indicate that the new epoch has been set successfully
2. Synchronization
self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
Copy the code
Here, the ZAB protocol enters the synchronization phase. Let’s look at what the leader does in the synchronization phase:
/** Get more than half of the ack as in [1]. If we look at the call to this method, we can see that it is called in learnerHandler, and before the call, All learnerHandler does is synchronize with the server it connects to
waitForNewLeaderAck(self.getId(), zk.getZxid());
// This opens the leader's transaction responsibility chain -- [2]
startZkServer();
// This is the configuration described in how-to-use, set to yes(the default) to indicate that the leader will also accept the client connection
// Add zkServer to ServerCnxFactory
if(! System.getProperty("zookeeper.leaderServes"."yes").equals("no")) {
self.setZooKeeperServer(zk);
}
// BROADCAST?
// Yes, because I have the assistant learnerHandler
self.setZabState(QuorumPeer.ZabState.BROADCAST);
Copy the code
Here are a few types of synchronization (you can also think about which cases lead to the following types of processing) :
- DIFF: differential synchronization
- TRUNC: rollback synchronization
- SNAP: Full synchronization
At this point, let’s look at the LearnerHandler code from waitForEpochAck to the middle of waitForNewLeaderAck
syncFollower
org.apache.zookeeper.server.quorum.LearnerHandler#syncFollower
-
Leader lastProcessedZxid == peerLastZixd: indicates that the transactions are consistent and no synchronization is required. In this case, the leader sends an empty DIFF QP to the follower
-
Peer lastProcessedZxid Is between [minCommittedLog, maxCommittedLog]. This interval is a commit queue maintained by ZK for fast synchronization with servers. Zk will first send a DIFF QP to the server, and then send the transaction larger than the peer ZXID to the server in a Proposal. After the server receives the transaction, Respond to an ack
-
When peer lastProcessedZxid > maxCommittedLog, trunc is required. Similarly, trunc qP is sent first and rollback synchronization is performed using the Proposal
-
If the peer lastProcessedZxid < minCommittedLog, the ZK tries to synchronize data in on-disk txnlog + committedLog mode. If the synchronization fails, the ZK uses SNAP mode to synchronize data. SNAP reads a snapshot. XXX file from disk, serializes it, and sends it to the server
After the synchronization is complete, the leader sends a NEWLEADER QP to the server to inform the server that the synchronization is over. After receiving an ACK response from the server, the leader and learnerHandler synchronize at the same time. The method waitForNewLeaderAck), the leader will execute the startZKServer method, and the available learnerHandler will also wait for zkServer to start.
startZkServer
org.apache.zookeeper.server.quorum.Leader#startZkServer
/ / derived from the base class org. Apache. Zookeeper. Server ZooKeeperServer# startup
// Intercept some code
public synchronized void startup(a) {
// Here is the responsibility chain of the leader transaction mentioned above
setupRequestProcessors();
// Set zK status
setState(State.RUNNING);
// Notify all threads waiting on the object
// These threads are the learnerHandler that connects to each server
notifyAll();
}
Copy the code
Once zkServer is started, The learnerHandler sends a UPTODATE message to each server telling them that we are ready to open the door and accept a client connection.
3. Broadcast
while (true) {
synchronized (this) {
// Check the status of quorumPeer and zkServer
if (!this.isRunning()) {
break;
}
// have less than half of followers jump out of while
if(! tickSkip && ! syncedAckSet.hasAllQuorums()) {break; }}// Send ping packets periodically
for(LearnerHandler f : getLearners()) { f.ping(); }}// If you jump from while, the MSG here will have a value. I have simplified the code
if (null! = shutdownMessage) { shutdown(shutdownMessage); }Copy the code
This is what the leader.lead() method does, but we already know that the learnerHandler handles most of the work between the leader and the server. So at this point, we also have to see what the learnerHandler is doing, right
while (true) {
qp = new QuorumPacket();
ia.readRecord(qp, "packet");
switch (qp.getType()) {
case Leader.ACK:
// ...
case Leader.PING:
// ...
break;
case Leader.REVALIDATE:
// ...
break;
case Leader.REQUEST:
// ...
break;
default:
LOG.warn("unexpected quorum packet, type: {}", packetToString(qp));
break; }}Copy the code
As you can see, the learnerHandler passes the switch… case… To handle each type of request from the server, throw an exception when something goes wrong, and finally, in the finally code block, shutdown
Follower
For followers, also switch from the QuorumPeer’s Run method… case… Starting with the code block, I will not paste the code here. I will simply sort out the general process and the communication with the leader at each stage.
1. Discovery
-
Followers (below) is replaced by a F, by org. Apache. Zookeeper. Server. The quorum. Learner# connectToLeader establish a TCP connection with the new leader.
-
F Sends qP of type FOLLOWERINFO to the leader(OBSERVERINFO if observer), which is received by learnerHandler.
Mentioned above leader lead in the implementation method, first to create LearnerCnxAcceptor — — > LearnerCnxAcceptorHandler – > LearnerHandler, after this series code execution, Just waiting for F to connect and send the FOLLOWERINFO message. This is the first message they communicate with; if not, the connection is discarded by the leader
-
Waiting for the Leader’s LEADERINFO QP, the post contains the leader’s latest epoch, so F sets the new epoch and sends the QP of type ACKEPOCH to the leader
End of Discovery phase
2. Synchronization
The leader sends synchronization flags (DIFF/TRUNC/SNAP) through the learnerHandler for different processing and reply. In the org. Apache. Zookeeper. Server. Quorum. Learner# syncWithLeader, we can see a different approach.
After the synchronization, the leader will send a NEWLEADER QP to F, and the LEADER will send an ACK. After receiving more than half of the ACKS, the leader will send an UPTODATE QP, and F will break out of the while loop after receiving the QP of this type:
case Leader.UPTODATE:
break outerLoop;
Copy the code
The following code replies with ACK and starts zkServer:
ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
writePacket(ack, true);
sock.setSoTimeout(self.tickTime * self.syncLimit);
self.setSyncMode(QuorumPeer.SyncMode.NONE);
zk.startup();
Copy the code
F zkServer also starts a chain of responsibility for transaction processing:
protected void setupRequestProcessors(a) {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
commitProcessor.start();
firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
((FollowerRequestProcessor) firstProcessor).start();
syncProcessor = new SyncRequestProcessor(this.new SendAckRequestProcessor(getFollower()));
syncProcessor.start();
}
Copy the code
As can be seen from the code, the responsibility chain of F is as follows:
-
FollowerRequestProcessor –> CommitProcessor –> FinalRequestProcessor
-
SyncRequestProcessor –> SendAckRequestProcessor
Through these chains of responsibility, the handling of a transaction becomes a clear division of labor.
3. Broadcast
After the syncWithLeader execution is complete, that is, synchronization is complete, F enters the broadcast phase.
If the F is configured with observerMasterPort, an ObserverMaster thread is also started to synchronize with observer transactions
if (self.getObserverMasterPort() > 0) {
om = new ObserverMaster(self, fzk, self.getObserverMasterPort());
om.start();
}
while (this.isRunning()) {
readPacket(qp);
processPacket(qp);
}
Copy the code
Here again, contact with the leader is maintained through the while
A write transaction
As we already know, ServerCnxnFactory (we did not configure netty, here refers to NIOServerCnxnFactory) is responsible for the connection to the client throughout the life cycle. Take a look at the NIOServerCnxnFactory class diagram:
Inheriting from ServerCnxnFactory, the main internal components are:
- IOWorkRequest: handles I/O reads and writes (m, configurable, default number of cores x 2)
- ConnectionExpirerThread: Is responsible for clearing abnormal/invalid/expired connections with clients (1 thread)
- AcceptThread: Accepts connections from clients and assigns them to selectorThreads (1 thread)
- SelectorThread: Handles connections to clients (n, configurable, or square root of kernel /2)
When a client request comes in, the selectorThread is sent to doWork of IOWorkRequest(in fact, IOWorkRequest thread pool), and doWork is sent to doIO of NIOServerCnxn. This class is really responsible for communicating with the client. If it receives a message from a client, the payload of this class is processed by the readRequest(or readConnectRequest) method.
private void readRequest(a) throws IOException {
zkServer.processPacket(this, incomingBuffer);
}
Copy the code
This method is very simple, I is to send the request to zkServer processing, will eventually add:
LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>();
Copy the code
This property, however, is handled by the run method of the class, the RequestThrottler thread, and blocked by the take() method until a message arrives.
ZooKeeperServer will handle it:
// org.apache.zookeeper.server.ZooKeeperServer#submitRequestNow
public void submitRequestNow(Request si) {
try {
// As you can see here, the message is handed to the Follower responsibility chain
firstProcessor.processRequest(si);
} catch (MissingSessionException e) {
requestFinished(si);
} catch(RequestProcessorException e) { requestFinished(si); }}Copy the code
At this point, we can sort of figure out the flow from the client sending a message to the message being processed
annotation
[1]
cnxAcceptor = new LearnerCnxAcceptor();
cnxAcceptor.start();
long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
Copy the code
Let’s talk about these three lines.
The first two lines are used to start the thread and accept the followers connection. What does accepting connections do?
We know that the new emperor will have a New Year name and the new leader in ZK will update his epoch.
Suppose there is an isolated C follower in the cluster that has never obtained the leader’s epoch. Then the leader fails, and C happens to be elected as the new leader. It needs to generate its own epoch. What should it do? Just add one to your own epoch?
That won’t do! You can’t be sure if the epoch has been used by other leaders!
Through the above three lines, ZK also acquired the latest epoch in the ensemble, that is, the largest epoch in the current ensemble, and then added one as the new epoch.
This is a holistic approach, which is very common in ZK.
If I had written it, I would have waited for all the connections first, counted all epochs after more than half of them, and then compared them with my own epochs to obtain the largest epoch and add one to become the epoch of the new leader.
Well, a very ordinary idea.
Let’s see how these guys who write hundreds of lines in one method do it.
-
Start the LearnerCnxAcceptor class, which is used to process connections to other servers, and use this handler to retrieve the latest epoch.
long zxid = qp.getZxid(); long newEpoch = learnerMaster.getEpochToPropose(this.getSid(), lastAcceptedEpoch); long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0); Copy the code
-
In the Leader main thread, getEpochToPropose is also used to retrieve the latest epoch
-
So the key here is getepochtoveto.
// org.apache.zookeeper.server.quorum.Leader#getEpochToPropose public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException { // Set myIDS for all connected servers synchronized (connectingFollowers) { // Flag bit, starting with true if(! waitingForNewEpoch) {return epoch; } // As long as your epoch is bigger than mine, you can replace me if (lastAcceptedEpoch >= epoch) { epoch = lastAcceptedEpoch + 1; } // Verify that the server is a valid member of the voting group // The order here indicates that you may not be a member of the voting party, as long as your epoch is older than mine // Why is this so? I understand that according to the ZAB protocol, the highest epoch here must be generated from the legitimate voting group members // So the way to deal with this is that you are in the right if (isParticipant(sid)) { connectingFollowers.add(sid); } QuorumVerifier verifier = self.getQuorumVerifier(); // Get the latest epoch and set flag to false // Notify all waiting threads if (connectingFollowers.contains(self.getId()) && verifier.containsQuorum(connectingFollowers)) { waitingForNewEpoch = false; self.setAcceptedEpoch(epoch); connectingFollowers.notifyAll(); } else { long start = Time.currentElapsedTime(); if (sid == self.getId()) { timeStartWaitForEpoch = start; } long cur = start; long end = start + self.getInitLimit() * self.getTickTime(); // While + wait // All threads that access this method will end up waiting here if they don't get half of the connections while(waitingForNewEpoch && cur < end && ! quitWaitForEpoch) { connectingFollowers.wait(end - cur); cur = Time.currentElapsedTime(); }if (waitingForNewEpoch) { throw new InterruptedException("Timeout while waiting for epoch from quorum"); }}returnepoch; }}Copy the code
I’ve added a little comment to the code above, but let’s talk about it more carefully.
We know that in Java wait and notifyAll are two methods of Object. For me, who writes business code for a long time, I hardly ever use it. Today is also after seeing here, I know that can also play this way.
Java.lang.Object#wait(long, int)
This method causes the current thread (referred to here as T) to place itself in the wait set for this object and then to relinquish any and all synchronization claims on this object. Note that only the locks on this object are relinquished; any other objects on which the current thread may be synchronized remain locked while the thread waits.
This method puts the current thread on the wait list of the lock object. For the code above, all visiting threads are put on the wait list of connectingFollowers.
But this method has a synchronization lock, so if one thread comes in, how can other threads access it?
The DOC of Wait says that synchronization claims on the object are also abandoned and only any locks on the previously acquired object are abandoned, at which point other threads can compete again.
Until a thread notify or notifyAll or interrupted or times out.
So here, I understand.
All server connections, including themselves, wait here for more than half of the connections to reach the standard, wake up, and go about their own business.
[2]
If we click on this method step by step, we’ll see that it calls two methods:
setupRequestProcessors();
startRequestThrottler();
Copy the code
The first method is used to start the chain of responsibility, responsible for the processing of all things during the leader.
The second method acts as a valve to constrain the number of requests.
Let’s focus on the first method and see what it does:
protected void setupRequestProcessors(a) {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener());
commitProcessor.start();
ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor);
// In the initialize method, another syncRequestProcessor is initialized
proposalProcessor.initialize();
prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
prepRequestProcessor.start();
firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);
setupContainerManager();
}
Copy the code
Here you can clearly see the chain of the leader processing requests:
LeaderRequestProcessor –> PrepRequestProcessor –> ProposalRequestProcessor –> CommitProcessor –> ToBeAppliedRequestProcessor –> FinalRequestProcessor
The leader also has a chain of responsibilities for logging transactions to disk:
SyncRequestProcessor –> AckRequestProcessor
As for what each processer is responsible for, you can take a look at the code and see what you can learn. I won’t go into details here.
The original link