1, the background
Kafka is an open source stream processing platform developed by the Apache Software Foundation and written in Scala and Java. Kafka is a high-throughput distributed publish-subscribe messaging system that processes all of the consumer’s action-flow data in a website.
To ensure the high availability of a Kafka cluster, Kafka allocates the same partition across several servers in the cluster. The cluster will select a partition leader and several partition followers. The leader receives the data written by the producer, and the followers then synchronizes the data to the leader. This paper introduces the process of data synchronization, which is also a typical watermarking based data synchronization method.
2
2.1 Basic Nouns
- Topic: Logical concepts used to differentiate services
- Server: Broker. Receive persistent messages, manage topic, permission management, consumer rebalancing, and more
- Partition: partition. An ordered sequence of messages, one topic for multiple partitions
- Message: Record. Do not explain
- Message displacement: Offset. The location of each message in the partition
- Producer: producer.
- Consumer: Consumer.
Note that this diagram shows the logical architecture of Kafka.
2.2 Copy and ISR
These are two conceptual terms within a broker.
A copy is a backup of the same PARition data between different brokers to improve availability. At the same time, kafka selects a leader copy for external reading. The followers proactively request the Leader copy to synchronize kafka logs to ensure data consistency between the master and slave copies.
An ISR is a collection of follower replicas that are eligible to be selected as the leader replicas. Kafka considers the message committed only when the leader and all replicas in the ISR are in sync.
2.3 Concepts in Replicas
Base offset: the offset of the first message in the copy
High watermark (HW) : Displacement of the last message that was committed for this copy. If a message’s offset is less than that value, all replicas have synchronized the message. If the offset of a message is greater than this value, it indicates that some replicas have not been synchronized to the message. Put another way, all messages with offsets less than this value are visible to the consumer; Anything greater than that is invisible. This value is important because it affects where the replica master and slave are synchronized and where the consumer consumes data. It should be noted that HW exists not only in the leader, but also in the followers, because in order to prevent the leader from crashing, the followers can immediately replace the leader and perform normal work (final consistency).
Clog end offset (LEO) : The displacement of the last message in the copy
3. Message synchronization process
Note that the main flow here is a synchronization flow prior to Kafka version 0.11. After version 0.11, there are slight differences, which will be covered later.
3.1 Overall Process
① The leader copy on Broker1 receives the message and updates its LEO value to 1.
② Follower copies on Broker2 and Broker3 each send requests to Broker 1. (Normally, it is the followers who regularly send the fetch requests to the leader, just like a heartbeat.)
③ Broker1 receives the fetch request and proactively pushes the message to the follower copy separately.
(4) After receiving the message, the follower copies update their LEO to 1 and return a response.
⑤ After the leader copy receives the response from other follower copies, the HW value is updated to 1. The message with displacement 0 can be consumed by the consumer.
3.2 Process Details
Principle: HW is the minimum of all leOs currently known. Why is that? Normally, the partitions of each broker are written sequentially, and a minimum LEO means that all replicas have been synchronized to all data prior to this LEO, satisfying the requirement that all messages prior to HW have been synchronized.
For the sake of description, we assume that there are two replicas synchronizing data, one leader and one follower
- In the first round of the fetch
At some point, the leader receives a message, writes the underlying data, and the data synchronization process begins.
LEO +1 of the leader, easy to understand, with a piece of information, the mantissa needs to be added by one.
2. The leader tries to update HW and takes the minimum LEO value of all copies, which is 0 in this case. So, where do you get the individual copy minima? The leader replica has a local location dedicated to caching this data, and other followers inform the leader of the fetch request
At this point, the follower sends the FETCH (the fetch request will carry its current LEO, which is now 0) and the leader receives the FETCH
3. The leader receives the FETCH and tries to update the HW. Minimum LEO value for all copies. It’s 1, it’s 0 in fetch, so it’s 0.
4. Get the data from offset > follower LEO and put it in response. This is the global LEO minimum) in response, which in this case is 0
5. The follower receives the response, writes the data and updates the LEO (+1 in this case)
6. The follower attempts to update the HW, which is the minimum value of the global LEO. Compare the HW in response with its own LEO and get the minimum value (the characteristics of the above four steps are used here). In this case, it’s 0
At this point, the first round of fetch ends. It should be noted that after the first round of fetch is completed, data synchronization has passed, but it is not visible, because the leader does not know whether the follower synchronization has been successful
- The second round of the fetch
1. The follower sends a FETCH request, carrying its own LEO=1
2. The leader tries to update the HW, the minimum value of the global LEO, thus 1
3. Get the data of offset > follower LEO and put it in response. There is no data this time, and put its own HW=1 into response
4. The follower receives the message, no data is written, and tries to update its own HW, which is the global minimum, in this case HW=1
At this point, the second round of fetch ends. At this point, the data synchronization is really finished and the new data is visible to the public
3.3 Inconsistent data
Based on the above discussion, it will be visible after two rounds of FETCH process. This time difference can easily lead to data loss or inconsistency
Scenario 1: Follwer and leader crash in the middle of two rounds of fetch. The premise is that the leader considers that the write has been committed
As shown in the figure, if the second fetch occurs, A has updated the HW, but has not wrapped the response back to B, then B crashes. The rebooted B will adjust LEO to the HW value before the crash, and the subsequent data will be deleted (see, there is a data inconsistency here). At this time, B wants to fetch to A. If A happens to hang up at this time and B is elected as the leader, AFTER A restarts, A will fetch the leader’s HW and his OWN LEO to get the minimum value, and finally get HW=1, so that the original HW=2 data will be permanently lost.
Scenario 2: In two rounds of fetch, the follower and leader crash at the same time
In the same situation as above, the second fetch occurs. A has updated the HW, but has not wrapped the response to return to B. At this point, the leader and follower crash at the same time, and THEN B restarts to become the leader. At this point, the producer sends a message to B, and since there are no followers, the HW is directly updated to 2. Then A returns to become A follower. At this point, A finds that his HW is equal to B’s, so he does not change it. But the message that A’s HW points to is not the same as the message that B’s HW points to, which is obviously not what we want.
3.4 Solve the problem of inconsistent data
After Kafka 0.11, the Leader epoch was introduced to replace the HW, and the reboot follower added a request to solve this problem. (This is why commercial use is mostly after 0.11). The epoch is actually a pair of values (epoch,offset). Epoch indicates the version number of the leader, and offset indicates the position of the write. So (0,0) means this is the first write, and it’s written at position 0. (1,120) means, this is the second write, and it’s at position 120.
Solution of scenario 1:
Both A and B will store the EPOCH value. Before A hangs, B sends A request to A and gets A reply after B restarts. The epoch in the response is (0, 2). After A hangs, B is selected as the leader again. At this time, the epoch will be added by 1. Meanwhile, since B does not store data of offset >= 2, no truncation will be performed. Data loss does not occur when user A comes back and synchronizes data with USER B again.
Solution of Scenario 2:
As shown in the figure, when B restarts first, it receives the message. The epoch of B is (1, 1). At this point, both A and B have data offset=1, but they are different. Then A restarts and sends B A message that returns the epoch as (1, 1). So we discard the data whose offset = 1 and store B’s data instead. It should be said that although the consistency of duplicate data is guaranteed, data loss occurs. Therefore, in practice, if kafka is used as a message queue, it will not be used in scenarios that require strong reliability, such as payments.