This article is a reading note for the second part of Design for Data-intensive Applications

A master-slave architecture

In the common master-slave architecture, there is one master and multiple slaves. The common read/write policy is write master and read slave, and the master node writes logs and transmits them over the network to maintain the database status of the slave node. However, this common policy may cause a series of distributed data consistency problems.

Usually we use a master-slave architecture for the following purposes:

  • Improve fault tolerance of services. Even if one node crashes, multiple nodes can provide services externally.
  • Improve service response speed andQPS, there are multiple redundant nodes that can provide services externally, which will naturally reduce the response delay and improve the service bearing capacity; The common usage strategy isCDNBy deploying servers across the country, users can improve response times by requesting the server closest to them without having to visit the source site over long distances.

Basic Concepts:

Replication state machine:

Note the distinction between the design pattern and the state machine pattern. The key point is what does each state mean?

  • The state in the replication state machine generally refers to the state of the database in the cluster node. The cluster node synchronizes its state with the primary node by constantly receiving and applying event logs.
  • The state in the state machine mode refers to the state of the program object. The program makes the state of the program object flow among several constant states by constantly responding to the defined state change events.

The replication state machine is based on the idea that the initial state of the cluster nodes is consistent, and the events leading to the state change are applied in the same order in each node, so the state of all nodes in the cluster must be consistent eventually, and the state synchronization of cluster nodes is maintained through event log transmission.

As shown in the figure above, each node defined under the replication state machine algorithm contains: its own state machine, local logs;

How to ensure that event logs are applied in the same order is implemented by specific distributed consistency algorithms (consensus algorithms). For example, ZAB and Raft implement full order broadcast algorithms to ensure this, which will be discussed in detail below.

Distributed system Model concepts:

When we want to research a distributed system, the first is to use formal language description of distributed system appear problem, and then sums up the corresponding system model according to these questions, and then based on the system definition model provides the precondition of algorithm, and the algorithm based on attribute system model demonstrates the effectiveness of the algorithm.

Common problem scenarios in distributed systems are as follows:

  • The node is down: The node may be down for a long time or recover and join the cluster again after a period of time. Based on this problem scenario, two system models of node collapse are definedCrash – abort model, crash – recovery modelIn everyday production, it is obvious that the crash-recovery model is more general;ZABwithRaftThe consensus algorithm is based on the system model, and solves the problem ofRejoining a cluster after a primary node crash can result in a split brainThe problems such as;
  • The communication between nodes is not responsive or the communication information is misaligned: A distributed system may fail to respond due to a network problem. The network delay is too high, the program timeout period is exceeded, and the process is suspended and cannot respondGC); Information mismatches occur between each node in the clusterWall clock (local time)There is a certain drift in the multi-master architectureDetermine the global operation sequence based on the local time of different nodesMay lead to misjudgment of the sequence of operations and result in incorrect data overwriting: therefore, based on the problem scenario, definedNode network communicationThree kinds of system models –Synchronous model, partially synchronous model, asynchronous modelPartial synchronization model is more suitable to describe daily production. The network is stable and has upper limit of response time in most of the time, but there may be a long time unresponsive scene in some cases.ZABwithRaftThe consensus algorithm is also based on the system model.How to determine whether the node fails and the wall clock oscillates under the condition of uncertain upper limit of network response timeWait for a problem.

Consensus algorithm:

Consensus: more than half of the cluster nodes agree on a proposal; The main reason for the consensus is that each node in the cluster cannot make a correct decision based on its own information.

If multiple clients simultaneously request the distributed system to try to obtain the last movie ticket, the consensus algorithm can be used to determine which client is the winner.

Consensus algorithm is used to tolerate partial synchronization, crash – recover node communication and node failure under distributed system model.

Next, we will first talk about the implementation method of replication state machine – log:

The log

Log Synchronization Mode

  • Synchronization: A log on the primary node can be submitted only after all child nodes copy it successfully. In this way, the log consistency between master and slave is obviously guaranteed. However, once a child node has a network problem, the log cannot be submitted until the node is judged to be invalid, during which time the cluster cannot provide write services.

  • Asynchronous: a log from the master node is only sent to all child nodes before it is submitted. In this way, the response time of the client is guaranteed, but there is obviously a delay in log replication between the primary and secondary nodes. If a log is submitted to the primary node and the primary node crashes, the log may be lost.

  • Semi-synchronous: a compromise between synchronization and asynchrony is adopted. A certain number of slave nodes in the cluster are used to synchronize update logs, and other slave nodes are used to asynchronously update data. This does not affect efficiency too much and ensures that there is a replacement node if the primary node crashes.

Synchronizing Log Content

  • Statement-based replication: For example, when update, INSERT, and DELETE statements need to be executed on the primary node, the primary node generates corresponding LOGS based on its SQL statements. The secondary node executes the corresponding SQL statements locally. This method is suitable for simpler key-value databases such as Redis; For complex relational databases, where transactions are widely supported and a transaction often contains multiple statements, this approach can be complex to generate synchronous logs.

  • Pre-write log-based replication: Database storage engines such as Innodb save WAL (pre-write log) before writing data to prevent a database crash, so that they can perform redo operations after recovering from a database crash. So we can try to use the log directly for synchronization; The advantages of this method is that don’t have to spend extra cost is used to generate log, harm is for WAL log is highly dependent on the storage engines, so we cluster log synchronization is also highly dependent on specific storage engine, namely if WAL log format not happened in a storage engine version updated forward compatible upgrades, We can’t do a rolling upgrade of the cluster;

  • Row-based log-based replication: Thus further, we need to cluster synchronization between separate logs with a specific storage engines, we can use similar Innodb undo the format of the log record database logic changes, such as we have added a record id to increase the a record, if you modify a record will be old data row identifier to delete, And add a new record… And based on this way, we can add information in the log whether the current logical log belongs to the transaction, whether the transaction has been committed and so on to better support the transaction;

By defining logs, we can simply implement the replication state machine, but we still need the distributed consistency protocol to ensure the sequence of log replication.

Data consistency definition

Strong data consistency (linearization)

As shown in the figure above, the client ABCD concurrently reads and writes to the database (with multiple data copies). The left side of the rectangle in the figure represents the time when the database receives the request, the right side represents the time when the database responds to the request, and the vertical line in the rectangle represents the time when the request takes effect.

Linearization means that multiple data copies in distributed system are equivalent to one data copy for client perception, and the modification of the data copy is atomic. That is, all client operations on the database can be linearized, that is, in line with the full order relationship, you can refer to the last database line in the figure above.

The key is to understand this atomicity:

When write(x, 1) assigns 1 to the x instruction before it takes effect, all clients that read X from the readable copies defined by all the read and write algorithms of the database return the initial value 0 (for example, under the write master read and slave algorithm, the readable copies defined by the algorithm are all the master and slave nodes). After the change takes effect, all clients read the x instruction from the readable copy defined by all read and write algorithms. The result returned by the x instruction is the changed value 1 until the next change takes effect.

That is, different clients have the same perception of the database. Because there are multiple copies of the database, and multiple copies need to be synchronized, the client will not feel the intermediate result of the operation – that is, the new value is obtained after reading the new copy, and the old value is obtained after reading the copy.

Similar to the atomicity definition in multi-threading, that is, the observing thread does not see the intermediate effect of a thread’s operation, the observing thread sees either the state in which the operation has not occurred or the state in which the operation has taken effect.

Then let’s first understand the definition and difference of total order and causal order:

Sequence of operations

  • Total order

If Arabic numerals belong to the whole sequence, as long as it is the category of comparison of two Arabic numerals, we can clearly know the sequence relationship of numbers in the whole Arabic sequence.

If the consensus algorithm wants to achieve the definition of linearization, the log sequence relation should support full order, that is, any two logs synchronized from the master node to the slave node, we need to know their sequence relation.

In common full-order relational broadcast algorithms -ZAB and Raft, the combination of

is generally used to achieve globally unique and increasing sequence number, namely ZXID in ZooKeeper.
(term)>

  • Causal order (partial order)

If there is no causal relationship between two operations, that is, they can be concurrent, then the two operations are not restricted by the causal order.

If two operations have a causal relationship, that is, they cannot be executed concurrently but must be executed successively, the two operations are restricted by the causal order.

In the master-slave synchronization scenario, maintaining the full order is generally to maintain the causal order. For concurrent operations, there is no problem about the order of log application in the slave node synchronization in most scenarios.

Weak data consistency

Because of the high cost of implementing isolation in ACID completely in a database system, weak isolation levels for commit read, snapshot read, serialization, and other transactions are proposed.

Similar to the reasons suggested for weak isolation levels of transactions, the cost of ensuring complete data consistency in distributed systems is high: In CAP theory reveals in a network partition occurs, for distributed systems, or ensure system data consistency, or guarantee the system availability, so if you want to make a distributed system in unstable under the network environment to ensure the consistency of the data is to be carried at the cost of service availability, and the price is not acceptable;

Therefore, some levels of weak data consistency (from low to high assurance level) are proposed:

  • Final consistency: This consistency level ensures that in a distributed system, the data version of the master node and slave node may lag behind that of the master node at a certain point in time, but it is ultimately consistent. However, the primary and secondary data cannot be consistent at a certain point in time -> when the final synchronization is completed. Therefore, the client may read old data in the write primary read slave policy, which is a relatively weak consistency level.

  • Read copy consistency: this consistency is stronger than the final consistency. This consistency ensures that old data will not be read after data modification in the read/write separation architecture due to the delay in log synchronization between the primary node and secondary node.

  • Causal order consistency: This consistency is stronger than final consistency, but different from the concern for read copy consistency, this consistency is concerned with the causal order consistency of logs;

Among them, the consistency of reading copy and the consistency of causal order will be explained in detail in the next interview, now only need to have an impression;

Data consistency issues

Copy the lag

Problem 1- Backward node read

When the client requests the master node to write data, the master node will commit(such as ZAB and Raft) after obtaining more than half of the support according to the general consensus algorithm. Under the write master read slave policy, if the client reads the slave node whose data has not been synchronized, the update loss will be observed by the user. Read After Write consistency problem;

From the data synchronization generally consists of two steps, persistence and journal and submit the application to its own state machine: in ZAB agreement, after the master node receives a client request for the request to generate a proposal (proposal), and the serial number of the corresponding proposals and their persistent log, then the bill will be passed through the network to each slave node;

  • The slave node will persist the proposal log and replyACKTo the master node; (Persisting logs from nodes)
  • When the master node receives a confirmation message from more than half of the nodes, it generates a corresponding for the proposalcommitThe message is sent to each slave node over the network. The master node then applies the proposal to modify the state machine and replies to the client. (Submit logs from the node and apply its own state machine)

Therefore, there are two levels that have not yet been synchronized:

  • Some followers have not completed the persistence of the proposal.

  • All the followers have implemented the persistence of the proposal, but some of the followers have not completed the synchronization of the state machine.

To solve the problem of master/slave synchronization delay, we can make the client read its own write:

Problem 2- Multiple read/write data is inconsistent

After the master node replies to the client for the completion of the request, the synchronization of the slave nodes has not been completed, and the client requests the node whose synchronization has been completed for the first time to obtain new data, but requests the old node for the second time.

A similar problem exists in the operating system. If a thread changes CPU after updating a variable to WriteBuffer or Cache before flushing the WriteBuffer (Cache synchronization), the thread reads the value of the variable again. We can solve this problem by using the operating system’s relevant memory barrier mechanisms, or by directly using language-level mechanisms such as volatile to force cache flushing.

Can we also solve the problem of multiple read/write inconsistencies by forcing log synchronization between clusters? Whether the distributed system can provide a mechanism so that when a client reads data from a slave node, the corresponding linear time point of the data must be synchronized after its corresponding write operation to the slave node;

We can solve this problem by Follwer Read or Quorum.

Problem 3- Inconsistent causal order

The essence of the above two problems is that the replication lag, but the log order does not change;

If the order of two synchronization logs with causality is changed due to network reasons, the causality is reversed.

Like the scene above:

— Question: “Mrs. Cake, how far into the future can you see?” — Answer: “Usually about 10 seconds.”

Where the question is the cause of the answer and the answer is the effect of the question, but the following figure stores the cause in partition 1 and the effect in partition 2. Then the client (observer) reads the slave node of partition 1 and the slave node of partition 2 respectively, because the master-slave synchronization delay of the two partitions is inconsistent. If the client only uses the synchronization time of the records on the two partitions to determine the order of the records, the causal relationship will be reversed.

Why does partition alone cause cause and effect problems?

The synchronization between master and slave nodes is generally based on Tcp protocol, which can ensure the sequence of network packets received by the receiver through Seq(Tcp header sequence number). Therefore, if there is only simple parallel expansion between nodes in the cluster, there will not be the problem of inconsistent causal order.

And after the master-slave node partition is introduced, it will produce the problem, because a database may be stored in multiple partitions from different nodes, and each node can only keep its partition from log order, but in doesn’t exist some mechanism to judge partition between log order, above only simple generated according to the records in the division of time timestamp ordering cause;

We can solve this problem by prefix consistent reading so that causal records are stored in the same partition, or by implementing a full-order message broadcast algorithm to provide a global logical clock for determining log order.

Solution 1- Read your own write (read copy consistency)

Under one master with many slaves, that is, read master. Since the client updates on the master node, then the client reads from the master node without replication lag – because there is no replication involved at all;

However, in this scheme, there are problems about the timing of three time points: the master node submits the log, the master node applies the log to modify the state machine and responds to the client:

The primary node submits logs

During master-slave synchronization, the master node maintains its matchIndex and nextIndex values for each slave node through heartbeat packets:

MatchIndex: the maximum log sequence number that the child node has synchronized (that is, the log sequence number of the ACK message returned to the master node);

NextIndex: indicates the sequence number of the next log to be synchronized from the primary node to the secondary node.

The primary node determines whether the log of the current index has been copied by half of the nodes and can be committed by the primary node by judging the size of the matchIndex of all the secondary nodes.

The master node modifies the state machine state by applying logs and modifying the state machine state.

The Raft algorithm does not enforce whether the master responds to the client after the log is committed or after the master modifies the state machine.

If the master node obtains half of the slave node’s support and sends the log to the state machine and then returns it, then there is no problem with writing the master and reading the master. In the case of commit and then apply, the state machine may not be modified when the client reads the record immediately after receiving the reply from the master node. Therefore, data consistency is still a problem. In this case, only the final consistency of data can be guaranteed – a weak consistency level.

So to avoid this problem, let’s first understand that each master node maintains four indexes in order to distinguish between logs in different states:

FirstLogIndex: the minimum log sequence number remaining in the primary node;

ApplyIndex: indicates the maximum log sequence number that has been applied to the state machine on the primary node.

CommitIndex: the maximum log sequence number that can be submitted after being confirmed by more than half of the cluster nodes in the primary node;

LastLogIndex: the maximum log sequence number generated after the latest request has been received in the master node;

After knowing the four indexes, it is relatively easy to solve this problem. When the master node has half confirmed the write command, we only need to record the current request sequence number (Commit Index). When the Apply Index catches up with the Commit Index, the content in the state machine can respond to the client.

However, when two or more primary nodes are split due to network partitions, the client may connect to the wrong primary node and read the old record;

Therefore, we need to introduce the next solution: reading the primary node also requires Quorum consensus mechanism;

PS: If multiple hosts are deployed, you can use Hash to ensure that the clients access the same nodes on the server.

Solution 2- Read awayQuorummechanism

The Quorum mechanism is used to achieve consensus and fault-tolerance, that is, each Proposal of the cluster must be approved by more than half of the nodes before it can be submitted.

The essence of solving the above problems is that when the client reads the master node, the master node must confirm through the heartbeat mechanism that it has obtained the support of more slave nodes than Quorum mechanism, so as to ensure the validity of its master node.

If the original primary node is read, the number of secondary nodes confirmed by the heartbeat mechanism must be less than half. In this case, you need to obtain the connection of the primary node again.

However, if read master write master, slave nodes only have redundancy to improve the fault tolerance of the system. So we can use the Follower Read strategy to make better use of the slave nodes.

Solution 3-Follwer Read

In essence, the state machine of the slave node may lag behind that of the master node, so the client records the current Read index when making a read request to the slave node. From the node until the state machine apply to the master node synchronization to log serial number exceeds the current read index and then return, can ensure that the backward data will not be read;

Although the scheme still relies on the master node, the pressure of the master node is relieved to some extent.

Solution 4- Prefix consistent read (ensures consistent causal order)

The problem is that the database has multiple partitions, and there is no global write order between partitions, but each partition has its own order, so we can solve this problem by putting cause-and-effect operations into the same partition.

In Kafka, for example, you can customize the partitioning strategy by Key.

Solution 5- Implement full order algorithm

For Mrs. Cake’s problem, of course, we can record the wall clock (local clock) on the master node of partition 1 and partition 2 when data is first written to the master node, and then synchronize the time with the log. However, according to the definition of the partially synchronous system model, there will be clock drift on the wall between nodes, which will still lead to errors in the global order of logs:

As shown in the figure above, because the wall clock on node 1 is faster than that on node 3, the global order of the set x = 1 operations should be earlier than x += 1, but the order is reversed when viewed through the timestamps on each node.

Therefore, the physical clock cannot achieve reliable full sequence, so the full sequence algorithm needs to be based on the logical clock, the generator according to the corresponding full sequence algorithm generated by the global unique, self-increasing sequence number as the logical clock;

As in THE ZAB protocol, we can determine the relative position of the request in the global order by comparing the size of the ZXID.

Above are the problems and solutions for normal network communication between nodes of master and slave architecture cluster. Next, we discuss another concern – node failure.

Node failure

There are two common system models for node failure: crash – abort and crash – recovery. Collapse-abort means that the failed node will not join the cluster after it fails. Collapse-recovery refers to that a node automatically recovers after it fails and tries to join the cluster again. The crash-recovery model is more realistic.

1 Node failure

If the heartbeat packets between nodes cannot be maintained on time, due to the instability of the network, they cannot judge whether the network is delayed or the communicating node has been hung up, but they still judge timeout.

For example, in Raft, if the slave node fails to communicate with the master node within a random timeout interval, the slave node considers the master node to be invalid and initiates a new election.

2 Crash – Recovery

During the crash-recovery process, two guarantees need to be provided:

  • Logs submitted by the primary node cannot be lost.
  • Nodes that are committed only in the master node need to be discarded;

Slave node failure

After the secondary node is added to the cluster again, you only need to find the primary node in the current cluster and synchronize logs.

The synchronization method is the same as that of newly added nodes in the cluster. The overall snapshot of the primary node is replicated first, and then the log sequence of newly added changes after the snapshot version is replicated.

Because log sequence-based replication is complete, it takes a long time for slave nodes to synchronize one by one, while snapshot replication based on the whole of the master node is much faster. Similarly, Redis generally uses RDB + AOF to synchronize from node state.

Primary node failure

If the primary node is determined to be invalid, a new round of primary node election is required to elect a new primary node.

The purpose of both Raft and ZAB algorithms is to elect the node with the highest sequence number as the master node, since the previous definition of the total sequence number means that the node with the highest sequence number has the latest state;

Node election may occur when the cluster is just started and the primary node of the cluster fails. The following is a comparative analysis of the election of the two protocols:

ZAB agreement

ZAB protocol defines the 64-bit global sequence number (EPOCH + 32-bit increment) as the logical clock. The epoch value increases every time the primary node is switched, and is used to identify the full sequence relationship of logs on different primary nodes during any period. The 32-bit increment is used to identify the full sequence relationship of logs during any period of the primary node.

For example, ZXID:(1, 2) indicates the second log during the term of the first primary node, which is newer than ZXID:(1, 1). For example, ZXID:(2, 1) indicates the first log in the term of the second primary node elected after the first primary node fails. It is newer than ZXID:(1, 2).

The ZAB protocol defines several states of a node:

Leading: the current node is the primary node. Following: The current node is a slave node;

Locking: The node participates in the election.

Observing: The current node is in Observing state (it does not participate in the election).

When the cluster is just started, all nodes are in the Locking state to participate in the election, and each node sends election information to other nodes in the cluster: The maximum serial number of the node and the host ID of the node are both 0 when the cluster is just started, and the host ID of the node is the myID value defined in each Broker configuration file of ZooKeeper.

The method of size comparison is that, the larger serial number is larger, and the larger myID is larger if the serial number is the same.

For example, (Host ID: 2 ZXID: 9) > (Host ID: 1 ZXID: 9) > (Host ID: 3 ZXID: 8)

Therefore, when the cluster is just started, the node with the largest host id is called the primary node. Similarly, when the primary node fails during cluster operation, we also conduct primary node election according to the same rules:

At the beginning of the election, since each node does not know the status of other nodes, the first round of voting will broadcast its vote to the cluster.

During the election process, each node has several packet receiving situations:

  • If the received ballot information (host number, ZXID number) is smaller than the node itself, this information will be ignored.
  • If the ballot information received (host number, ZXID number) is larger than the node itself, the node will change its ballot to the node larger than itself.
  • Received a new oneLeaderThe heartbeat packet of the node, and the combination of the heartbeat packet (host number, ZXID number) is greater than itself, is changed toFollowering;
  • More than half of the nodes (Quorum) to change its node status toLeadingAnd sends heartbeat packets to other nodes to indicate that the primary node is created and the election ends.
  • The cluster does not generate an appropriate primary node in the current election and therefore does not receive the heartbeat message from the primary node. A new election is required.

Raft agreement

Similar to ZAB and other full-order broadcast protocols, the serial number is formed by term number + index as the logical clock of the cluster. Term number increases with each additional term, so as to maintain the full-order relationship of logs of different master nodes during any period. Index indicates the number of logs in the tenure of the primary node.

Similarly, Raft defines node roles:

Leader: the primary node of the current cluster.

Follower: indicates the slave node of the current cluster.

Candidate: node in the current cluster that is participating in the election;

Nodes vote for the primary node in the cluster via RequestVote RPCs format messages;

Raft is a lot like ZAB. Here are some of the advantages of Raft:

When the cluster is just started: Different from ZAB initializing each node as LOCKING, Raft node starts with the status of Follower and randomly initializing the election timeout time for each node. The unit of election timeout time is increased >> the unit of heartbeat packet synchronization time. As a result, when a node in the cluster becomes Candidate first, This node sends its maximum log serial number LastLogIndex to other nodes to participate in the election. After voting for this node and making it the primary node, this node can start its term by synchronizing heartbeat packets to its child nodes.

Compared to ZAB, Raft introduces randomization of initial time to avoid the tedious process of changing votes when the cluster starts up because only one node state changes at the same timeCandidate;

reference

Stop doubting your intelligence, Raft protocol is hard to understand

In-depth analysis of Raft distributed consistency protocol

Distributed Consistency Algorithm: Raft Algorithm