This is the 16th day of my participation in Gwen Challenge
Why partition
Kafka has the concept of Topic, which is a logical container for real data, and is divided into several partitions under topics. In other words, Kafka’s message organization is actually a three-level structure: Topic – partition – message. Each message under a topic is saved in only one partition, rather than multiple copies in multiple partitions. This diagram on the website shows kafka’s tertiary structure very clearlyThat is as follows:
-
The purpose of partitioning is to provide load balancing capability, or the main reason for partitioning data is to achieve high system scalability.
-
Different partitions can be placed on different node machines, and database read and write operations are carried out according to the granularity of partition, so that each node machine can independently execute the read and write request processing of its own partition.
-
Also, we can increase the throughput of the overall system by adding new nodes.
Partitioning strategies
Reasons for partitioning
-
It is easy to scale in a cluster. Each Partition can be adjusted to fit the machine on which it is located, and a topic can be composed of multiple partitions, so that the whole cluster can accommodate any size of data.
-
Concurrency can be improved because you can read and write on a Partition basis.
Principles of zoning
We need to encapsulate the data sent by the producer into a ProducerRecord object.
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
this(topic, partition, timestamp, key, value, (Iterable)null);
}
public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {
this(topic, partition, (Long)null, key, value, headers);
}
public ProducerRecord(String topic, Integer partition, K key, V value) {
this(topic, partition, (Long)null, key, value, (Iterable)null);
}
public ProducerRecord(String topic, K key, V value) {
this(topic, (Integer)null, (Long)null, key, value, (Iterable)null);
}
public ProducerRecord(String topic, V value) {
this(topic, (Integer)null, (Long)null, (Object)null, value, (Iterable)null);
}
Copy the code
- If partition is specified, the specified value is directly used as the partiton value.
- If the partition value is not specified but there is a key, mod the hash value of the key and the number of partitions of the topic to obtain the partition value.
- In the case that there is neither a partition value nor a key value, the first call randomly generates an integer (incremented on this integer with each subsequent call) and modulates this value with the total number of partitions available for topic to obtain the partition value. It’s called the round Robin algorithm.
RoundRobinPartitioner RoundRobinPartitioner
public class RoundRobinPartitioner implements Partitioner {
private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new
ConcurrentHashMap();
public RoundRobinPartitioner(a) {}public void configure(Map
configs)
,> {}public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[]
valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
int nextValue = this.nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if(! availablePartitions.isEmpty()) {int part = Utils.toPositive(nextValue) % availablePartitions.size();
return ((PartitionInfo)availablePartitions.get(part)).partition();
} else {
returnUtils.toPositive(nextValue) % numPartitions; }}private int nextValue(String topic) {
AtomicInteger counter = (AtomicInteger)this.topicCounterMap.computeIfAbsent(topic, (k) -> {
return new AtomicInteger(0);
});
return counter.getAndIncrement();
}
public void close(a) {}}Copy the code
Data reliability assurance
In order to ensure that the data sent by producer can be reliably sent to the designated topic, every partition of a topic needs to send ack (Acknowledgement) to producer after receiving the data sent by producer. If the producer receives an ACK, it sends the next round, or resends the data otherwise.
Duplicate data synchronization policy
Kafka chose the second option for the following reasons:
-
In order to tolerate the failure of N nodes, the first solution requires 2n+1 copies, while the second solution requires only N +1 copies. Each partition in Kafka has a large amount of data, and the first solution creates a large amount of data redundancy.
-
Although the network delay of the second scheme is higher, the network delay has less impact on Kafka.
ISR
-
After using the second solution, imagine the following scenario: the leader receives the data and all the followers start to synchronize the data. However, one follower fails to synchronize with the leader due to some fault. The leader has to wait until the synchronization is complete before sending an ACK. How to solve this problem?
-
The Leader maintains a dynamic in-sync Replica set (ISR), which means a collection of followers that are in sync with the Leader. When the follower data in the ISR is synchronized, the leader sends an ACK to the follower.
-
If the follower does not synchronize data with the leader for a long time, the follower will be kicked out of the ISR. This threshold is set by the replica.lag.time.max.ms parameter. When the Leader fails, a new Leader is elected from the ISR.
Ack response mechanism
For some unimportant data, the reliability of the data is not very high and can tolerate a small amount of data loss. Therefore, there is no need to wait for all the followers in the ISR to receive data successfully.
So Kafka provides users with three levels of reliability, and the user chooses the following configuration based on trade-offs between reliability and latency requirements.
-
Acks = 0: The producer is only responsible for sending messages, and sends ACKS regardless of whether the Leader and Follower have finished falling disks. In this way, the latency can be minimized. However, when the Leader is not off the disk, the failure will cause data loss.
-
Acks = 1: After the Leader drops data, the Follower sends an ACK regardless of whether the Follower drops data. This ensures that the Leader node has a copy of the data. However, if the followers are not synchronized, the Leader failure will cause data loss.
-
Acks = -1(all) : The producer does not send an ACK until the Leader and all followers in the ISR set have synchronized. However, when the Leader fails after the Follower synchronization is complete and before the broker sends an ACK, a new Leader will be elected from the ISR. Since the producer does not receive an ACK, the producer will send a new message to the new Leader, causing data duplication.
Troubleshooting Details
- LEO — Log End Offset: The Offset of the latest piece of data written to Kafka by Producer.
- HW — High Watermark: : refers to the largest offset seen by the consumer, the smallest LEO in the ISR queue.
Followers of failure
If a follower is faulty, the follower is temporarily kicked out of the ISR. After the follower recovers, the follower reads the last HW recorded on the local disk, intercepts the log file that is higher than the HW, and synchronizes data from the HW to the leader. After the follower’s LEO is greater than or equal to the Partition’s HW, that is, after the follower catches up with the leader, the follower can join the ISR again.
Leader failure
After the leader becomes faulty, a new leader is selected from the ISR. To ensure data consistency among multiple copies, the remaining followers cut off the log files whose values are higher than HW and then synchronize data with the new leader.
Note: This only guarantees data consistency between replicas, not data loss or duplication.
Exactly Once semantic
There are three types of distributed messaging consistency semantics:
- At Least Once: Messages are not lost but may be repeated.
- The message will be lost At Most Once.
- Exactly Once: A message is not lost or repeated.
Before kafka0.11, we could not implement exactly once guarantee semantics in Kafka. In versions after 0.11, we can combine new, idempotent, and acks=-1 to implement exactly once for Kafka producers.
- Acks =-1: as described in the preceding ACK mechanism, it implements the at least once semantics to ensure that no data is lost, but duplicate data may occur.
Idempotency: A feature added after version 0.11 for producers that means that no matter how many times a producer sends duplicate data to the broker, the broker will persist only one.
Prior to version 0.11, the exactly Once semantics could only be implemented using primary key de-weighting via an external system such as hbase rowkey.
Idempotent interpretation:
To enable idempotency, set the parameter enable.idompotence = true in the producer profile.
The idempotent nature of Kafka is to place a de-redo operation upstream of the data that would otherwise be done downstream.
-
A producer with idempotency enabled will be assigned a PID (Producer ID) during initialization. The messages sent by the producer to the same Partition will carry a Sequence Number. The Broker caches <PID, Partition, SeqNumber> as the primary key of the message. When a message with the same primary key is submitted, the Broker persists only one message.
-
However, the PID will change when the producer restarts, and different partitions have different numbers, so the idempotent of the producer cannot guarantee Exactly Once across partitions and sessions.
Transactions: Kafka introduced transaction support in version 0.11. Transactions guarantee that Kafka can produce and consume across partitions and sessions with Exactly Once semantics, with either all success or all failure.
Producer transactions:
-
Kafka introduces a new component, Transaction Coordinator, that manages a globally unique Transaction ID and binds the PID of the producer to the Transaction ID. The PID changes when the producer restarts. However, it can still interact with the Transaction Coordinator and retrieve the original PID using the Transaction ID, thus ensuring that the producer can guarantee Exactly Once after the restart.
-
At the same time, a Transaction Coordinator writes Transaction information to an internal Topic in Kafka. Even if the entire Kafka service is restarted, the ongoing Transaction state can be restored because the Transaction state has been persisted to the Topic and continues.
Basic architecture diagram of the producer client
KafkaProducer has two basic threads:
-
Main thread: Responsible for message creation, interceptor, serializer, divider, etc., and appending messages to the message collector RecorderAccumulator (here you can see that the interceptor does execute before serialization and partitioning).
-
RecoderAccumulator maintains a two-ended queue of type Deque
for each partition.
ProducerBatch can be temporarily understood as a set of ProducerRecord. Batch sending helps to improve throughput and reduce network impact.
-
Because the producer client uses java.io.ByteBuffer to save messages before sending them, and maintains a BufferPool to implement the reuse of ByteBuffer;
-
This buffer pool is managed only for bytebuffers of a specific size (specified by batch.size) and cannot be reused for caches that are too large.
Each time a ProducerRecord message is added, it will find/create the corresponding two-end queue, get a ProducerBatch from its tail, and judge whether the size of the current message can be written into the batch.
Write if it can; If the message cannot be written, create a ProducerBatch and check whether the message size exceeds the value set by the client parameter batch.size.
If no, create a new ProducerBatch with batch.size, which is convenient for cache reuse.
If yes, the ProducerBatch is created based on the calculated message size. The disadvantage is that this memory cannot be reused.
The Sender thread:
The thread takes cached messages from the message collector and processes them in the form of
After the <Node, List is converted to <Node, Request>, data can be sent to the server.
Before sending, the Sender thread saves the message in the form of Map<NodeId, Deque> to InFlightRequests, which can be used to obtain LeastLoadedNodes, which are the least overloaded of the current nodes. To achieve the message as soon as possible.
Writing process
The sequence diagram of messages written by producer is as follows:
Process description:
- Zookeeper’s “/brokers/… The /state” node finds the leader of the partition.
- Producer sends messages to the leader.
- The leader writes the message to the local log.
- The followers write the leader pull message to the local log and the leader sends an ACK.
- After receiving the REPLICA ACKS in all isRS, the leader adds HW (high watermark, and finally commit offset) and sends THE ACK to the producer.