ZooKeeper version: 3.6.0
ZooKeeper startup
How does ZooKeeper start? !!!!! Where to find the answer to this question?
1. zkServer.sh
When we turn it on, how does it run? Let me see. Oh, yes
./zkServer.sh start
Copy the code
Yes, that’s it!
Ok, that’s easy. Let’s take a look at the zkserver.sh file
case The $1 in
start)
#...
nohup "$JAVA" $ZOO_DATADIR_AUTOCREATE "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" \
"-Dzookeeper.log.file=${ZOO_LOG_FILE}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" \
-XX:+HeapDumpOnOutOfMemoryError -XX:OnOutOfMemoryError='kill -9 %p' \
-cp "$CLASSPATH" $JVMFLAGS $ZOOMAIN "$ZOOCFG" > "$_ZOO_DAEMON_OUT" 2>&1 < /dev/null &
#...
;;
esac
Copy the code
We see that when we execute zkServer.sh start, we actually end up here, where the main class $ZOOMAIN is enabled in Java CP mode. Look up
if [ "x$JMXDISABLE" = "x"] | | ["$JMXDISABLE" = 'false' ]
then
echo "ZooKeeper JMX enabled by default"2 > &if [ "x$JMXPORT" = "x" ]
then
# for some reason these two options are necessary on jdk6 on Ubuntu
# accord to the docs they are not necessary, but otw jconsole cannot
# do a local attach
ZOOMAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=$JMXLOCALONLY org.apache.zookeeper.server.quorum.QuorumPeerMain"
else
#...
ZOOMAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=$JMXPORT -Dcom.sun.management.jmxremote.authenticate=$JMXAUTH -Dcom.sun.management.jmxremote.ssl=$JMXSSL -Dzookeeper.jmx.log4j.disable=$JMXLOG4J org.apache.zookeeper.server.quorum.QuorumPeerMain"
fi
else
echo "JMX disabled by user request" >&2
ZOOMAIN="org.apache.zookeeper.server.quorum.QuorumPeerMain"
fi
Copy the code
As you can see, there are several cases where you add a startup variable to ZOOMAIN, but in the end, the main class, it’s still what it is:
org.apache.zookeeper.server.quorum.QuorumPeerMain
So we know the zK startup class.
2. QuorumPeerMain
Let’s pull down the source code of ZK and look inside it step by step through the main class. Source clone look at GitHub
Once the download is complete (MVN clean install is required), we can find the class and see:
InitializeAndRun is called through the main method, which reads config, starts a cleaner like thread, and determines whether to enable ensemble or standalone mode based on the parameters passed in.
Here we look at ensemble mode.
org.apache.zookeeper.server.quorum.QuorumPeerMain#runFromConfig
-
This method initializes a very important object, the QuorumPeer!! One of zK’s core classes. We’ll talk about that later. Let’s see what else this method does.
-
All tickTime, initLimit, syncLimit Settings in zoo. CFG are initialized in this method.
-
Initialization of the election algorithm, myID, database, etc
-
The QuorumVerifier class, which maintains information about all servers in the ensemble at the time of initialization, is used to determine whether the election is majority
-
ServerCnxnFactory class, responsible for the server to the client communication, mainly include the NIO and Netty two implementations, defaults to NIO, can be configured to zookeeper. ServerCnxnFactory to use Netty
-
others
Quorumpeer-start () is then run, which is a thread that competes for CPU time slices and runs when it gets them
3. QuorumPeer
This class is responsible for maintaining everything that is running on the server, or a server, which is essentially a quorumPeer. Why do you say that?
When zK is running, it communicates with the client, the server, and the leader, and the objects held by this class do these things.
3.1 QuorumPeer. Start ()
So let’s start with what does start do
public synchronized void start(a) {
// Verify that myID is correct
if(! getView().containsKey(myid)) {throw new RuntimeException("My id " + myid + " not in the peer list");
}
// Load zK database
loadDataBase();
startServerCnxnFactory();
try {
adminServer.start();
} catch (AdminServerException e) {
LOG.warn("Problem starting AdminServer", e);
System.out.println(e);
}
// Start an election round
startLeaderElection();
startJvmPauseMonitor();
super.start();
}
Copy the code
After doing some validation and so forth, and opening up an election round, let’s see what this method does
public synchronized void startLeaderElection(a) {
try {
// Obviously, our initial state is LOOKING, so we will initialize a ballot of our own
if (getPeerState() == ServerState.LOOKING) {
currentVote = newVote(myid, getLastLoggedZxid(), getCurrentEpoch()); }}catch (IOException e) {
RuntimeException re = new RuntimeException(e.getMessage());
re.setStackTrace(e.getStackTrace());
throw re;
}
// Initialize the election algorithm
this.electionAlg = createElectionAlgorithm(electionType);
}
protected Election createElectionAlgorithm(int electionAlgorithm) {
Election le = null;
switch (electionAlgorithm) {
// both 1 and 2 throw exceptions
case 3:
// QuorumCnxManager handles all TCP connections during elections!!
// Why is this class initialized here?
// Because he is responsible for monitoring the election
QuorumCnxManager qcm = createCnxnManager();
QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
if(oldQcm ! =null) {
LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?) ");
oldQcm.halt();
}
QuorumCnxManager.Listener listener = qcm.listener;
if(listener ! =null) {
// Listener is responsible for listening for all connections
listener.start();
FastLeaderElection fle = new FastLeaderElection(this, qcm);
// Start the election thread
fle.start();
le = fle;
} else {
LOG.error("Null listener when initializing cnx manager");
}
break;
default:
assert false;
}
return le;
}
Copy the code
3.2 FastLeaderElection
Here, let’s look at the class diagram for FLE(FastLeaderElection)
The main objects FLE holds are
-
ToSend: is an encapsulation class that encapsulates the body of a sent message
-
Notification: Also a wrapper class that informs other servers that the vote has changed
-
2. Messenger The real thing, used to send and receive ballots during an election, where:
- WorkerReceiver: receives votes
- WorkerSender: Sends a vote
-
There are two other important variables in FastLeaderElection that we can easily understand from their names
- Sendqueue LinkedBlockingQueue
: Queue to receive votes that need to be sent
- Recvqueue LinkedBlockingQueue
: Queue to receive votes from other servers
- Sendqueue LinkedBlockingQueue
As you can see, while FLE has a start() method, it is not a threaded class! After start, it actually initializes Messenger, and then Messenger initializes the WorkerReceiver and WorkerSender, two thread classes that are started.
In other words, the two goods are working: they are pushed to sendQueue and RecvQueue, respectively, according to the various states of the votes received/sent
- Sendqueue was finally mentioned above
QuorumCnxManager
Consumption, which it sends to other servers - The RecvQueue is ultimately consumed by FLE when the votes are counted to determine the birth of the leader
3.3 QuorumPeer. The run ()
With that little FLE overview, let’s go back to the QuorumPeer class. We started the thread in QuorumPeerMain, and now it’s ready to run.
QuourmPeer contains several states:
- ServerState: Indicates the current server status
- LOOKING: An election will be called in this state
- OBSERVING: indicates the state of being an observer
- FOLLOWING: status of the follower
- LEADING: Indicates the leader status
- ZabState: indicates the zAB status
- -Blair: ELECTION
- DESCOVERY: Establishes TCP connections with each follower/observer during the discovery phase
- SYNCHRONIZATION: synchronizes information with followers during the SYNCHRONIZATION phase
- BROADCAST: indicates the BROADCAST phase
/ / source see org. Apache. The zookeeper. Server. Quorum. QuorumPeer# run
// The code is simplified here
public void run(a) {
/ / set the JMX
try {
while (running) {
switch (getPeerState()) {
case LOOKING:
if (Boolean.getBoolean("readonlymode.enabled")) {
// Necessary processing in readonly mode [1]
setCurrentVote(makeLEStrategy().lookForLeader());
// ...
} else {
// This flag is related to the version in the configuration
// Version is related to zK dynamic configuration
// I haven't seen it yet, so fill in the hole here
if (shuttingDownLE) {
shuttingDownLE = false;
startLeaderElection();
}
setCurrentVote(makeLEStrategy().lookForLeader());
}
break;
case OBSERVING,FOLLOWING,LEADING:
// Set to OBSERVER FOLLOWER LEADER
break; }}}Copy the code
This is the main process for initiating an election. Simply call the lookForLeader method with FLE directly. Clear responsibilities.
3.4 lookForLeader ()
A vote may have the following problems:
- There is a problem with the ballot data, such as a packet lost during TCP transmission…
- The server that cast this vote does not have permission (Observer)
- When I received the ballot, I was not LOOKING myself (when would that be? [4])
- When receiving a ballot, the other server’s ballot is not LOOKING.
- Normal vote, you are LOOKING for leader(the voting state is LOOKING), I am also LOOKING for leader(my state is also LEADING)
The first three cases above are handled by the WorkReceiver
In the latter two cases, the WorkerReceiver puts the vote into the RecvQueue, which is processed by the following process.
Before we look at the code below, let’s introduce a few variables and data structures
- Logicalclock: To keep track of my own election rounds
- todo
- Notification (one ballot after processing)
- Leader: long, the sid of the vote, myID, the SID of the server to which the vote is to be elected, is not its own
- PeerEpoch: the era in which voters elected their leader
- Zxid: the zxID of the leader elected by the voters
- Sid: Long, voter’s own SID
- ElectionEpoch: the voting cycle of the voter. This value is compared to my own logicalclock to determine if it is in the same cycle
- State: serverState of the voter
- Qv: Voter information
Zk compares two valid votes according to the following rules
Source: org. Apache. Zookeeper. Server. Quorum. FastLeaderElection# totalOrderPredicate
- Epoch: the epoch of the leader to be chosen, choose the big one! PeerEpoch in the Notification
- Zxid: the same time as the epoch, look at the election leader of the zxID, choose the big!
- Myid: What if it’s the same? Select myID large! Leader in Notification
// Code has also been simplified
/ / source see org. Apache. The zookeeper. Server. Quorum. FastLeaderElection# lookForLeader
public Vote lookForLeader(a) throws InterruptedException {
try {
// A collection used to store valid votes (logicalClock == votes.electionepoch)
// The server uses it to determine the election result
Map<Long, Vote> recvset = new HashMap<Long, Vote>();
// A detailed explanation of this field and pr [2]
Map<Long, Vote> outofelection = new HashMap<Long, Vote>();
int notTimeout = minNotificationInterval;
// logicalClock increases, initializes a vote of its own, and broadcasts it
synchronized (this) {
logicalclock.incrementAndGet();
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
sendNotifications();
SyncedLearnerTracker voteSet;
while((self.getPeerState() == ServerState.LOOKING) && (! stop)) {// The recvqueue mentioned earlier, from which votes are taken
Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
// No votes! So what happens? And why is it that one is resending and the other is not? [3]
if (n == null) {
if (manager.haveDelivered()) {
sendNotifications();
} else{ manager.connectAll(); }}else if (validVoter(n.sid) && validVoter(n.leader)) {
// The voter's serverState
switch (n.state) {
case LOOKING:
// There are too many votes in this election, which means all previous votes are invalid, so empty the ballot boxes
// Update your own logicalclock,
if (n.electionEpoch > logicalclock.get()) {
logicalclock.set(n.electionEpoch);
recvset.clear();
// Compare two votes and update
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
// After the ballot is updated, send another server
sendNotifications();
} else if (n.electionEpoch < logicalclock.get()) {
// Your election cycle is smaller than mine, so I'll just have to ignore you
break;
// In the same election cycle, votes are also compared and updated
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}
// The invalid ballot is broken, and the invalid ballot is put into the box
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
// All update operations will update their own data such as the proposedLeader
// So the new Vote parameters are all updated parameters
voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));
// More than half
if (voteSet.hasAllQuorums()) {
// What's interesting here is that it's going to look at it again, get the vote from the top, and run it over here
// Did you receive the ballot again? If you did, it was the most ugly one.
// Since break, n is not empty, it enters the outside while loop, which runs through the above process again
// Under what circumstances will the leader in this ballot be elected?
// Apparently, more than half of the servers have not identified the server before receiving this ballot, so it will be
// There is a probability of being elected
while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) ! =null) {
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
recvqueue.put(n);
break; }}// The leader can be determined
if (n == null) {
setPeerState(proposedLeader, voteSet);
Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
leaveInstance(endVote);
returnendVote; }}break;
case OBSERVING:
break;
case FOLLOWING:
case LEADING:
// You can think of the situation that will come here [5]
// The same election cycle.
// We all participated in the vote, others have finished electing the leader, when replying to me
if (n.electionEpoch == logicalclock.get()) {
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
voteSet = getVoteTracker(recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
if (voteSet.hasAllQuorums() && checkLeader(recvset, n.leader, n.electionEpoch)) {
setPeerState(n.leader, voteSet);
Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
leaveInstance(endVote);
returnendVote; }}// When new servers join stable ensemble, they come here
// Mainly verify that the current leader is really followed by more than half of the followers
outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
voteSet = getVoteTracker(outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) {
synchronized (this) {
logicalclock.set(n.electionEpoch);
setPeerState(n.leader, voteSet);
}
Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
break; }}}return null; }}Copy the code
That is, the general overview of the FLE algorithm.
Let’s sum it up in a few words:
- Determine if it’s the same election cycle. The election cycle is updated only when an election is called. I ignore the ones that are smaller than me, the ones that are the same or the ones that are larger, compare and update the votes according to epoch, zxID, myID, and resend their own votes.
- After collecting enough votes, judge whether there is more than half of the votes (in fact, ZK also went to check the status of the leader to be elected, which can be said to be quite responsible). Beyond that, you go to the next state.
- If a new server joins the leader, check the legitimacy of the current leader and join the leader. (If it is illegal, it will continue to send votes until most servers in the cluster observe it, because it is a while loop.)
FLE Flow
This flow chart may not be particularly accurate, some specific processes are missing, please be sure to look at critically. To be precise, you have to criticize all online articles: Joy:
annotation
[1] :
Zk provides server in readonly mode. We know that when a server loses connections to more than half of the servers, it stops working and no read or write requests are processed. However, after using this mode, the server will still process read requests. While availability is guaranteed, consistency is lost, that is, the CAP is now converted from CP to AP
[2]
outofelection pr
[3]
Under what circumstances do you not receive votes?
- I have already sent it, it is your problem, you did not reply to me!! Maybe all servers lost this message due to some kind of problem, I can’t help it, I have to send it again. So is duplication of votes a concern here? Of course not! I use set to store the vote. If you have the same vote, I’ll just overwrite it once.
- I thought I did, but I didn’t… Sorry, it’s my pot!! It turned out that I had a problem with your connection. Ok, I will try again. But here, after reconnecting, there is no resending the ballot. Why is that? We can look a little bit at the way it sends a message, it has to make a connection and then take out the message and send it, so if it’s not connected at all, there’s no take out message and send it, so there’s no need to send the message again.
[4]
- When a new server joins a stable ensemble, the new server will initiate voting. When I reply to the election initiated by the new server, my own status must be LEADING/FOLLOWING /OBSERVING
- In the cluster, a server finds that the leader has failed, and it will initiate a vote. However, I have not sensed the leader’s state, and it still remains the same as before: FOLLOWING/OBSERING, WHEN I reply
[5]
- When I vote as a new server, the messages that people respond to me will go here
- I found out in time that the leader of ensemble had died, and when a new election was called, the reply from others would come here
- At the time of the election, half of the server had completed the election. I was old and slow. After initiating the vote, others replied
- As a Follower, I run and die, and so do I when I pull up again
The original link