Zookeeper plays an important role in distributed systems. As an excellent middleware system, it is not easy to use zooKeeper. So let’s go into a little bit of depth, and hopefully the reader will have some fun after reading it.
background
Architecture evolution
The architecture of each system is not immutable. With the continuous development of business, the increase of users and data, the architecture of the system also needs to be constantly updated (which also shows that the architecture is gradual).
The above three figures are the evolution path of normal architecture. If the business complexity increases in the later period, it can also evolve to the direction of micro-service. Because it is not the key point, I will not expand the description here. But the one thing that doesn’t change is that our application is no longer monolithic and centralized, and from the second picture on, it’s going to be distributed, so we can call it distributed
A distributed system is a system in which hardware or software components communicate and coordinate with each other only through messaging on different network computers
challenge
A distributed architecture has several advantages over a single architecture
-
High scalability
-
High concurrency
-
A high performance
-
High availability
Distributed architecture also presents new challenges:
-
Distributed network
1. Communication exceptions include message delay, message loss, and network three-state problems
2. Network partition: often said “split brain” phenomenon
-
Distributed transaction
Atomicity, isolation, consistency, persistence
-
Competition issues, order issues
Distributed transaction and data consistency solutions
Single-player scenarios
What is a transaction?
A transaction is a set of user-defined database operations that can be viewed as a completed logical processing unit of work, all or none of which are indivisible units of work.
Why should there be transactions?
It is how to quickly and correctly reach a consensus on the value of a certain data within the cluster in a distributed system where exceptions may occur, and ensure that the consistency of the whole system will not be damaged if any of the above exceptions occur.
Distributed scenario
The CAP theorem
- C: consistency, the ability of data to report consistent errors across multiple copies (strict consistency, real-time, sequential)
- A: Availability refers to the fact that the services provided by the system must always be in A state of availability, and that every request can receive A correct response — but the data obtained is not guaranteed to be the latest data
- P: Fault tolerance of partitions. When a distributed system encounters any network partition failure, it can provide consistent and available services, unless the entire network environment fails
No system can satisfy CAP at the same time. Why?
- CA: Satisfies atomic and available, abandons partition fault tolerance. (Single application)
- CP: meet atomic and partition fault tolerance, abandon availability, that is, when the system partition occurs, in order to ensure atomicity, must abandon availability, to ensure consistency.
- AP: Satisfies availability and fault tolerance of partitions, abandons consistency. When partitions occur, in order to ensure system availability, nodes must continue to serve the outside world, and thus abandons atomicity.
Since there is no scheme that perfectly combines CAP, BASE theory is introduced
Full name: Basically Available, Soft state, and Eventually consistent.
Base theory is the result of tradeoff between consistency and availability in CAP. It is derived from the summary of distributed practice of large-scale Internet and gradually evolved based on CAP theorem. The core idea is that although Strong consistency cannot be achieved, each application can adopt an appropriate method to achieve Eventual consistency according to its own service characteristics.
Explain: what is a soft state? In contrast to atomicity, requiring that copies of data across multiple nodes be consistent is a “hard state.” Soft state refers to that data in the system is allowed to exist in an intermediate state, and the state is considered not to affect the overall availability of the system, that is, the system is allowed to have data delay in multiple data copies of different nodes.
Consistency protocol
In order to solve the consistency problem of distributed system, there are many solutions, here is a brief introduction of 2PC, 3PC, TCC
2PC
The transaction commits in two paragraphs
- Phase one
- The coordinator initiates a transaction query
- Participants perform transactions (write Redo and Undo logs simultaneously)
- Participants give feedback to the coordinator
- Phase two
- Perform transaction commit
- Interrupt transaction commit
A few problems with 2PC
-
1. Synchronization blocking. During execution, all participating nodes are transaction blocking. When a participant occupies a common resource, other third party nodes have to block access to the common resource.
-
2. Single point of failure. Because of the importance of the coordinator, if the coordinator fails. The participants will keep blocking. Especially in phase 2, when the coordinator fails, all participants are still locked in the transaction resources and cannot continue to complete the transaction. (If the coordinator is down, you can re-elect a coordinator, but you can’t solve the problem of participants being blocked because the coordinator is down)
-
3. Inconsistent data. In phase 2 of the two-phase commit, after the coordinator sends a COMMIT request to the participant, a local network exception occurs or the coordinator fails during the commit request, which results in only a subset of the participant receiving the commit request. These participants perform the COMMIT operation after receiving the COMMIT request. However, other parts of the machine that do not receive the COMMIT request cannot perform the transaction commit. Then the whole distributed system appears the data consistency phenomenon.
-
4. Unresolvable problem in phase 2: When the coordinator sends a COMMIT message, it crashes, and the only participant who received the message also crashes. So even if the coordinator creates a new coordinator by election agreement, the status of the transaction is uncertain, and no one knows whether the transaction has been committed.
Due to the defects of two-stage submission, such as synchronous blocking, single point of problem and brain split, researchers improved the two-stage submission and proposed three-stage submission.
3PC
Transaction commit is an improved version of 2PC in three sections, with two major changes:
- Introduce timeouts, both for the coordinator and the participant
- A preparation phase is inserted in the first and second phases to ensure that the states of the participating nodes are consistent until the final commit phase
The steps are as follows:
Note:
- With timeout and failure feedback at each stage, the coordinator sends an interrupt request to roll back.
- A rollback is also performed in phase two if a timeout occurs while the participant waits for the coordinator.
- In phase three, if the participant waits out, the transaction commit is performed instead of being rolled back.
Why is the transaction committed in phase 3: In fact, this should be determined based on probability. When entering phase 3, it means that the participant has already received the PreCommit request in phase 2, so the PreCommit request should be generated only if the coordinator has received the CanCommit response from all participants before the second phase starts. (Once a participant receives a PreCommit, he knows that everyone has agreed to change it.) So, in a nutshell, when he enters phase 3, even though he does not receive a COMMIT or abort response due to network timeouts, he has reason to believe that the commit has a high probability of success.
summary
With 2PC and 3PC distributed transactions have been greatly guaranteed, but still not 100%, for example, in the following scenarios:
The two-stage problem can be broken down like this:
- When coordinators go wrong, participants go wrong;
- When coordinators make mistakes, participants do not make mistakes;
- Coordinators make no mistakes, participants make mistakes;
- The coordinator does not make mistakes, and neither do the participants.
Obviously, type 4 is not a problem. So there are really only three questions. Problem 2 can be solved by simply creating a NEW coordinator. The error in Question 3 is clearly the goal of the two-phase commit protocol, so it is no problem. The only problem is that the coordinator gets it wrong and the participants get it wrong.
Whether 2PC or 3PC, data inconsistencies occur only in the following cases: the coordinator hangs up, and when the backup coordinator restores the protocol, one of the participants hangs up, and the backup coordinator cannot distinguish the status of the pending participants with the remaining participants in the “YES” state. (Hung here can be understood as downtime or the network is not connected)
TCC
2PC compensation transactions, an implementation of ‘flexible transactions’, full name for the try-confirm-Cancel type XA protocol (based on which Msyql implements distributed transactions).
TCC is the implementation of ultimate consistency and does not hold resource locks for performance purposes. It is strongly correlated with services. Generally, the try, Confirm, and Cancel interfaces are implemented.
conclusion
It is almost impossible to achieve a perfectly distributed transaction. In the above scheme, there are exceptions in some scenarios, so we try to guarantee as much as possible and then finally have transaction compensation mechanism to guarantee the final consistency. TCC is the ultimate consistent solution with a high level of business involvement, and there are certainly many open source implementations to learn from: ByteTCC, TCC-Transaction, SEGA
Zookeeper makes history
In the distributed scenario, there are many urgent problems to be solved. Most of the time, developing a program is no longer simply considering the logic of the business itself, but also falling into the logic of how to make multiple programs work together. Therefore, in line with professional tasks, zooKeeper emerged.
What is a zookeeper
Zookeeper: Distributed coordination service, inspired by file system apis, provides a set of simple apis that developers can use to implement common collaboration tasks.
Zookeeper contains an application development library (client) and a Java implemented service component (server). Where the service component is distributed, ensuring CP.
Zookeeper mission
Zookeeper aims to coordinate multiple tasks in a distributed system, which can include multiple processes, be they collaborative tasks or manage competing (resource aspects).
-
Goal 1: Simple data model
Simple tree structure, similar to the directory structure of a file system, but stored in memory.
-
Goal 2: You can build a cluster
A usable cluster is formed by a group of machines, and each machine keeps communicating with each other. It is worth mentioning that as long as more than half of the machines in the cluster can work properly, then the whole cluster can work properly.
-
Goal 3: Sequential access
Transactions in a ZooKeeper cluster are sequenced, and this is the final feature that allows for a higher level of synchronization primitives.
-
Goal 4: High performance
The QPS for read requests is high because the data is fully stored in memory.
Zookeeper focuses more on task collaboration than other distributed lock managers or distributed databases. Zookeeper does not provide any lock interface or universal data storage interface, and has no special synchronization primitives, making it more flexible to use.
Basic concepts of ZooKeeper
Before using ZooKeeper, learn about zooKeeper concepts.
The cluster character
-
leader
Responsible for initiating votes and resolutions and updating system status. Generally speaking, it is responsible for cluster write requests, of course, read requests will also be handled.
-
follower
It processes the client’s read request, but does not process the write request. Instead, it forwards the write request to the leader for processing. It participates in voting and has the right to vote and be elected.
-
observer
A special follower can only process read requests from clients, but does not have the right to vote. This role is added to increase the read request throughput of the cluster.
The session
It is simply understood as the end-to-end TCP connection between the client and the server, and the heartbeat is used to maintain valid connections. Zookeeper relies on the session mechanism for many features, such as watch, ACL, and sequence. Sessions are also stateful, including CONNEXCTED, CONNECTING, RECONNECTED, RECONNECTING, and CLOSED.
State transition diagram
Data nodes
Compared with other distributed systems, zooKeeper’s data nodes are very simple, just a tree directory structure
Analogous to file structures “/” and paths, “/app1” and “/app2” are children of the root path.
There are three types of data nodes:
-
Persistent node
Once a persistent node is created, the data node remains on the ZK server and is not deleted even if the session between the client and server that created the node is closed
-
Temporary node
In contrast to persistent nodes, if a node is created as a temporary node, the node data will not be stored on the ZK server, and when the client drawing that created the temporary node is closed due to timeout or an exception, the node will be deleted on the ZK accordingly.
-
Order node
Whether persistent or temporary, the original node is followed by a 10-digit decimal ordinal number, meaning that multiple clients can create nodes with the same name.
Although the above data nodes are of different types, each node in ZK maintains these contents: a binary array (used to store node data), ACL access control information, child node data (null because temporary nodes are not allowed to have child nodes), and its own state information field, stat.
The data node status structure is as follows
version
As mentioned earlier, each data node has data on it. Each data write operation generates a version number. Zookeeper maintains this version number for it, and there are three versions
Datspanning: Version of the current node
Cversion: indicates the version of the child node of the current node
AclVersion: indicates the ACL version of the current controller
Watcher
Event monitoring is an important feature of ZooKeeper. The server pushes events to the client, which can be understood as the observer mode.
ACL
Similar to UNIX file system permission control, there are five main types of permission
CREATE: permission to CREATE child nodes
READ: Permission to READ data and child node lists
WRITE: Updates the data permission of a node
DELETE: deletes the permission of a child node
ADMIN: Sets the ACL permission of a node
Zookeeper has hit its stride
After understanding the basic concepts of ZooKeeper, here is a brief introduction to common apis and usage scenarios
Commonly used API
create
create [-s] [-e] path data acl
-s indicates that the node is a sequential node, -e indicates that the node is a temporary node, and the persistent node is not added by default. An ACL is a node permission.
set
Set path data [v] Updates path data to data. V indicates the version to be updated, not the current version by default.
delete
Delete path [v] Deletes the node path. The path must have no child nodes; otherwise, an error is reported.
get
get path [watch]
Obtain detailed data about node PATH. If watch is added, the client will be notified of data changes on this node.
ls
ls path [watch]
Gets the child node under the node path, and notifies the client of changes to the child node.
Client use
The common apis are described above, but in daily use, development is usually based on the encapsulated client to call, so here are two common client tools.
Native API issues
(1) The Watcher of Zookeeper is one-off and needs to be re-registered after each trigger;
(2) Reconnection mechanism is not implemented after Session timeout;
(3) Exception processing is cumbersome. Zookeeper provides many exceptions, and developers may not know how to deal with these exception information.
(4) provides only a simple byte[] array interface, does not provide object level serialization;
(5) If an exception is thrown during node creation, check whether the node exists.
(6) Cascading deletion cannot be achieved when nodes are deleted;
ZKClient
The original API to do a simple encapsulation, mainly to solve the above problems, and in the use of the above is more simple, of course, there are also shortcomings, as follows: (1) zkClient community is not active, the document is not perfect, almost no reference documents;
(2) Exception handling simplification (throwing RuntimeException);
(3) Retry mechanism is difficult to use;
(4) No reference implementation for various usage scenarios is provided
Curator
This is probably the most widely used client currently, and many companies build their own ZooKeeper clients based on this. As an open source project of Netfix, Exhibit Curator has become a top-level project of Apache. The most important thing is that it provides a more convenient API to use and helps us to implement common scenarios such as distributed lock, master select master and so on.
Common scenario
Now that you know how to use ZooKeeper, what scenarios can you use it in?
A distributed lock
Distributed locking is a means of controlling the mutual exclusion of resource access between multiple processes.
Zk – based temporary nodes and Wathc mechanism can easily implement a distributed exclusive lock.
Principle:
-
Acquiring a lock
The clients all call the create method, and the ZK cluster ensures that only one client can successfully create a temporary node under a specified node. The failed client then registers watcher listening and enters the wait (timeout waiting requires listener removal).
-
Release the lock
If the client is down, the temporary node is automatically removed or the client deletes the temporary node
-
process
Data publish/subscribe
Zk provides centralized management and dynamic update of configuration information.
How it works: When an application starts up, it registers Watcher listeners on designated nodes in the ZK cluster so that the application is notified of changes in node data in real time.
Note: It is important to note that the ZK stores only the configuration information with a small amount of data or metadata information such as switches. You should not put some service data on the ZK.
Load balancing
Similar to the function of Nginx, the server can automatically register its address to the fixed nodes in the ZK cluster, and then the client read and cache down, and then access the server on the client to achieve polling and other balancing algorithms.
Choose the main
Zk also uses the strong consistency feature of ZK, which ensures that the creation of nodes under high concurrency conditions can ensure global uniqueness, and the watcher mechanism ensures that the primary node can notify other nodes when offline. It is quite convenient to use. Kafka’s broker selection is dependent on ZK.
Naming service, distributed Id
The zK node can store the name, address, port and other information of each service in the system
Distributed Id generators (globally unique and increasing numeric types) can be implemented using Zookeeper’s ability to create sequential children
They are hand
Knowing what ZK is, how to use it, and where to use it, here’s why zK is awesome.
What is the ZAB protocol
ZAB: Zookeeper Atomic Broadcast
ZAB protocol is a crash recovery and atomic broadcast protocol specially designed for distributed coordination service Zookeeper.
Zookeeper switches between these two modes. In short, when the Leader service is available, it goes into message broadcast mode, and when the Leader service is unavailable, it goes into crash recovery mode.
Why design such a protocol
Simply speaking, this protocol is to meet the data consistency and fault tolerance of zK cluster, and to quickly recover in case of crash to meet certain availability, and finally reach BASE.
ZK cluster are implemented based on master-slave model, the model of data synchronization principle is simple: connect the primary server from the server, and their latest transaction ID will be distributed to the primary server, the primary server according to the transaction ID synchronization points, and the transaction again sent from the server, from server synchronization master server after the completion of the notice.
How did you do that?
A few questions
In order to achieve consistency and fast recovery, the zAB protocol needs to consider the following issues:
- How to ensure data consistency on different nodes in normal scenarios?
- In the master-slave architecture, the leader crashes. How can data consistency be guaranteed?
- How to quickly elect the master node to provide external services during recovery?
In response to these problems, zAB protocol needs to implement the following important points:
1. Message broadcasting
2. Quick leader election
3. Synchronize primary and secondary data
Problem solving
So from these points to briefly explain the ZAB protocol.
First, a few related concepts need to be introduced
-
SID: specifies the server ID in zoo.conf. For example, if there are three instances, set SID to 1,2, and 3 respectively
-
Zxid: the transaction ID, the number of transactions stored in the local server, is an increasing 64-bit number, which is generally used to represent the maximum transaction ID of the local server.
The next 32 bits is the epoch. Era; The world; If a new leader is generated, the epoch will be incremented. The lower 32 bits count will be incremented. Each time the epoch increments, count will be reset to 0.
-
Epoch: The logical clock, or the number of votes, is used to determine whether multiple votes are in the same election cycle. This value is an increment sequence on the server that increments when entering a new election round.
-
PeerEpoch: the elected leader Epoch
-
State: indicates the election status of the server
- LOOKING, the state of the campaign,
- FOLLOWING, follower state, synchronous leader state, participate in voting.
- OBSERVING: indicates that the system is in the leader state. The system does not participate in voting.
- LEADING, leader status
Leader election
First up:
- The service start
- The leader crashes and the remaining followers re-trigger the election
- Crash recovery or adding a new instance
To sum up, the cluster already has two scenarios of leader and no leader
The scenario with a leader is relatively simple. When a newly added machine attempts to elect a leader, it is informed of the information about the leader on the server side of the current cluster, and then establishes a connection with the leader and performs status synchronization.
Focus on the scenario without leader:
The server in the cluster changes its status to LOOKING, and then sends a vote to the other servers in the cluster. The vote information contains two information: SID (vote, ZXID). For example, vote(1,2) is the vote of server 1 with ZXID 2. As shown in the figure above, the voting situation of Server3, 4 and 5 is (3,9), (4,8) and (5,8) respectively. Then, each server will receive the voting information of other servers, which requires a unified rule to select the leader. Simply speaking, the rule is that the leader with a large ZIXD and the SID with the same ZXID will be compared. After multiple votes, if a server receives more than half of the same votes, the sid is the leader. To put it simply, the newer the data of a server is, the more likely it is to become the leader (the newer the zxID is, the more data recovery can be guaranteed).
In ZK, the implementation of the above election algorithm is FastLeaderElection, the algorithm core is shown in the figure
Sendqueue: queue for sending votes recvQueue: queue for receiving votes for receiving external votes WorkerSender: Send votes from sendQueue WorkerReciver: If the epoch in the zxID of the external vote is found to be smaller than the current server, it will ignore it and continue to send its own vote. If the current state of the server is not LOOKING, it is ignored. It should be noted that the ballots sent by the Observer are also ignored.
Supplementary notes:
- Due to the network delay, when the node can not get enough broadcast information, it will make wrong voting judgment, and the correction process will be more time-consuming
- During the election process, the server node will wait for a certain time and then broadcast the voting information. The interval is generally set to 200 ms
- The above Leader election adopts the event trigger Push mode to broadcast messages, which is called the fast Leader election, because the previous Leader election adopts the Pull mode and is pulled once every 1s.
Data synchronization
After the leader election, whether it is a new startup or a crash recovery process, after a prospective leader is determined, the transactions of the leader need to be synchronized to ensure the consistency of data in the whole cluster.
The would-be leader’s ZXID must be the largest in the cluster, so followers are designed to only receive transactions larger than their own largest ZXID. Based on this premise, the data synchronization logic is as follows:
- All followers send the epoch of their last received transaction to the would-be leader;
- The quasi-leader selects the largest epoch, adds 1 to obtain E1, and sends e1 to the followers. (The quasi leader is already the machine with the largest ZXID, and the epoch has been updated, in case a leader appears)
- After receiving the epoch sent by the would-be leader, the follower compares it with its own epoch. If it is smaller than the epoch, the follower updates its epoch to E1 and sends ACK information (epoch information and historical transaction set) to the would-be leader.
- After receiving the ACK message, the prospective leader will select one of the historical transaction sets as the initialization transaction set, which meets the maximum ZXID.
News broadcast
1) In the ZooKeeper cluster, the transmission strategy of data copy is to adopt message broadcast mode. The synchronization of data copies in ZooKeeper is similar to, but different from, two-phase commits. The two-part commit requires the coordinator to wait until all participants have responded with an ACK confirmation message before sending a COMMIT message. All participants are required to either succeed or fail. Two – stage lift creates serious blocking problems.
2) In Zab protocol, the ACK message that the Leader is waiting for from followers refers to “it is sufficient if more than half of the followers give feedback successfully, rather than receiving all feedback from followers”.
Specific steps of message broadcast
1) The client initiates a write operation request.
2) The Leader server converts the client request into a transactional Proposal Proposal and assigns a global ID, namely ZXID, to each Proposal.
3) The Leader server assigns a separate queue to each Follower server, and then puts the proposals to be broadcast into the queue in turn, and sends messages according to the FIFO policy.
4) After receiving the Proposal, the Follower first writes it to the local disk as a transaction log. After the Proposal is successfully written, the Follower sends an Ack message to the Leader.
5) After receiving Ack messages from more than half of the followers, the Leader considers that the message has been sent successfully and can send a COMMIT message.
6) The Leader broadcasts a commit message to all followers and also completes the transaction commit itself. After receiving the COMMIT message, the Follower commits the previous transaction.
Zookeeper uses the core of the ZAB protocol, that is, if one server submits a Proposal, all servers must submit the Proposal correctly. This is also an indication of the ultimate consistency achieved by CAP/BASE.
A separate FIFO message queue is maintained between the Leader server and each Follower server for sending and receiving messages. Messages in the queue can be decoupled asynchronously. The Leader and Follower only need to send messages to the queue. If using a synchronous approach causes blocking, performance degrades significantly.
A final note:
ZooKeeper does not guarantee that at every instance in time, two different clients will have identical views of ZooKeeper data. Due to factors like network delays, one client may perform an update before another client gets notified of the change. Consider the scenario of two clients, A and B. If client A sets the value of a znode /a from 0 to 1, then tells client B to read /a, client B may read the old value of 0, depending on which server it is connected to. If it is important that Client A and Client B read the same value, Client B should should call the sync() method from the ZooKeeper API method before it performs its read.
Zk does not guarantee complete consistency across clients, and if data consistency is required for a node, the sync() method is called first.
They are not
-
The performance of master write is not scalable, which can easily lead to bottlenecks
-
Too much data makes elections take too long or even lose
-
The API itself is hard to use