Kafka data reliability assurance

In order to ensure that the data sent by producer can be reliably sent to the designated topic, every partition of a topic needs to send ack (Acknowledgement) to producer after receiving the data sent by producer. If the producer receives an ACK, it sends the next round, or resends the data otherwise.

1 Duplicate data synchronization policy

plan advantages disadvantages
Send an ACK when more than half have synchronized Low latency When a new leader is elected, the failure of N nodes is tolerated and 2N +1 copies are required
Ack is sent after all synchronization is complete When a new leader is elected, n+1 copies are required to tolerate the failure of N nodes High latency

Comments:

  • More than half send ACK synchronously: Since n nodes are to be tolerated for failure, assuming that the worst case is now that n machines that have been synchronized have failed, theoretically n+1 nodes are needed to ensure data integrity. Since half of the data is now synchronized, ACK will be sent. Only the number of copies is 2n+1, and the half is N +1. In order to ensure that no matter what the situation can have a machine sub-data integrity
  • ACK is sent only after all synchronization is complete. If n nodes fail, data integrity can be guaranteed as long as one node is normal (n+1, n nodes fail; 1 normal), elects a new leader

Kafka chose the second option for the following reasons:

(1) Also in order to tolerate the failure of N nodes, the first solution requires 2N +1 copies, and the second solution only needs N +1 copies, and each partition of Kafka has a large amount of data, the first solution will cause a large amount of data redundancy.

(2) Although the network delay of the second scheme is relatively high, the network delay has little influence on Kafka.

2ISR

After using the second solution, imagine the following scenario: the leader receives the data and all the followers start to synchronize the data. However, one follower fails to synchronize with the leader due to some fault. The leader has to wait until the synchronization is complete before sending an ACK. How to solve this problem?

The Leader maintains a dynamic in-sync Replica set (ISR), which means a collection of followers that are in sync with the Leader. When the followers in the ISR complete the synchronization, the leader sends an ACK to the producer. If the follower does not synchronize data to the leader for a long time, the follower will be kicked out of the ISR. This time threshold is set by the replica.lag.time.max.ms parameter. When the Leader fails, a new Leader is elected from the ISR.

3ACK response mechanism

For some unimportant data, the reliability of the data is not very high and can tolerate a small amount of data loss. Therefore, there is no need to wait for all the followers in the ISR to receive data successfully.

So Kafka provides users with three levels of reliability, and the user chooses the following configuration based on trade-offs between reliability and latency requirements.

Acks parameter configuration:

Acks.

  • 0: The producer does not wait for the broker’s ACK. This operation provides a minimum delay for the broker to return a received message before it has been written to disk. Data may be lost if the broker fails.

  • 1: The producer waits for the ACK from the broker, and the partition’s leader returns an ACK after falling down. If the leader fails before the follower synchronization succeeds, data will be lost.

    In the following figure, the leader writes Hello successfully and sends an ACK to the Producer, but the Follower has not fallen yet. The leader hangs up and a new leader is elected, and the Hello data is lost

  • -1 (all) : The producer returns an ACK only after the broker and the partition’s leader and follower fall successfully. However, if the leader fails after the follower synchronization is complete but before the broker sends an ACK, data duplication can occur

    After the Leader and Follower are synchronized successfully, the Leader hangs up before ack is sent. After a new Leader is elected, Hello data is resend, causing duplicate data on nodes

4 Fault Adjustment

(1) Follower failure

If a follower is faulty, the follower is temporarily kicked out of the ISR. After the follower recovers, the follower reads the last HW recorded on the local disk, intercepts the log file that is higher than the HW, and synchronizes data from the HW to the leader. After the follower’s LEO is greater than or equal to the Partition’s HW, that is, after the follower catches up with the leader, the follower can join the ISR again.

(2) The leader fails

After the leader becomes faulty, a new leader is selected from the ISR. To ensure data consistency among multiple copies, the remaining followers cut off the log files whose values are higher than HW and then synchronize data from the new leader.

Note: This only guarantees data consistency between replicas, not data loss or duplication.

5 exactly Once semantics

  1. Setting the ACK level of the Server to -1 ensures that no data will be lost between Producer and Server.
  2. By contrast, setting the server ACK level to 0 ensures that the producer will send each message only Once (i.e. At Most Once).

At Least Once ensures that data is not lost, but not repeated. At Least Once, data cannot be lost. However, for some very important information, such as transaction data, downstream data consumers require that data be neither duplicated nor lost, the Exactly Once semantics. Prior to version 0.11, there was nothing Kafka could do about it except to ensure that data was not lost and that downstream consumers were globally de-duplicated. In the case of multiple downstream applications, each requires separate global de-weighting, which can have a significant impact on performance.

Kafka 0.11 introduced a major feature: idempotence. Idempotent means that no matter how many times Producer sends repeated data to the Server, the Server persists only one. Idempotence combined with At Least Once semantics forms Kafka’s Exactly Once semantics. That is:

At Least Once + idempotent = Exactly Once


f ( f ( x ) ) = f ( x ) f(f(x))=f(x)

To enable idempotency, simply set enable.idompotence to true in the Producer argument. The idempotent implementation of Kafka essentially reloads the data upstream of what was previously required downstream. A Producer that enables idempotent is assigned a PID during initialization, and the messages sent to the same Partition are accompanied by a Sequence Number(offset). The Broker caches , and persists only one message when it is submitted with the same primary key. ,>

However, PID will change after restart, and different partitions also have different primary keys, so idempotent cannot guarantee Exactly Once across partitions and sessions.

6 develop

ISR and AR

In simple terms, all the Replicas in a partition are called AR (Assigned Replicas). All Replicas (including the leader Replicas) that keep a certain degree of synchronization with the leader Replicas constitute an ISR. The ISR set is a subset of the AR set. Messages are sent to the Leader replica before the follower replica pulls messages from the leader for synchronization. During synchronization, the follow copy lags behind the Leader copy. The “degree of synchronization” mentioned above refers to the tolerable lag range, which can be configured with parameters. As a result, too many replicas (excluding the leader replicas) that are too late in synchronizing with the leader replicas form an OSR (out-of-sync). AR = ISR + OSR Normally, all follower replicas should be in sync with the leader replicas to a certain extent, that is, AR=ISR and the OSR set is empty.

ISR scalability

The leader replica is responsible for maintaining and tracking the lagging state of all follower copies in the ISR set. When the follower replica falls too far behind or fails, the leader replica removes it from the ISR set. If all the follower copies in the OSR set “catch up” with the Leader copy, the leader copy will migrate it from the OSR set to the ISR set. By default, when the leader replica fails, only the follower replica in the ISR collection is eligible to be elected as the new leader, while the follower replica in the OSR collection has no chance at all (though this can be changed by configuration).

HW, LEO, LSO

High Watermark (HW) identifies a specific message offset, and consumers can only pull messages before that offset.

The following figure shows a log file. There are only nine messages in the log file. The first message has an offset (LogStartOffset) of 0, and the last message has an offset of 8. The HW of the log file is 6, which means that the consumer can only pull messages with offset between 0 and 5. Messages with offset 6 are invisible to the consumer.

LEO (Log End Offset) : identifies the Offset of the next message to be written in the current Log file. In the figure above, the position whose offset is 9 is the LEO of the current log file. The size of the LEO is equal to the offset value of the last message in the current log partition plus 1. Each replica in the partitioned ISR collection maintains its own LEO, and the smallest LEO in the ISR collection is the partitioned HW, which for consumers can only consume messages before the HW.

The following is a detailed analysis of the relationship between ISR set, HW and LEO.

Assume that there are three copies in the ISR set of a partition, namely, one leader copy and two follower copies. In this case, both LEO and HW of the partition are 3 respectively. Messages 3 and 4 are first stored in the Leader replica after they depart from the producer.

After the message is written to the leader copy, the follower copy sends a pull request to pull messages 3 and 4 for message synchronization.

In the synchronization process, the synchronization efficiency of different copies is different. At a certain point, Follower1 completely keeps up with the leader copy while Follower2 only synchronizes message 3, as shown in the following figure, so the LEO of the leader copy is 5, that of Follower1 is 5, and that of Follower2 is 4. The HW of the current partition is set to the minimum value of 4, at which point the consumer can consume messages between offset0 and 3.

When all replicas have successfully written messages 3 and 4, HW and LEO for the entire partition are 5, so consumers can consume messages with offset 4.

This shows that Kafka’s replication mechanism is neither fully synchronous nor purely asynchronous. In fact, synchronous replication requires all working follower replicas to be replicated before the message is confirmed to have been successfully committed. This type of replication greatly affects performance. In asynchronous replication, the follower copy asynchronously copies data from the leader copy, and the data is considered to have been committed as long as it is written to the leader copy. In this case, if the follower replicas are not replicated yet and fall behind the leader replicas, then the leader replicas fail, data will be lost. Kafka uses this ISR approach to effectively balance data reliability and performance.