preface
The body of the
Producer sending process
The production side is run in coordination with two threads, the main thread and the sender thread
- Interceptor: Interceptors are used to customize messages.
- Serialization: Serialize keys and values using specified tools.
- Partition: Determines which partition to send the message to.
- Specified partition: Directly sends data to the specified partition.
- If no partition is specified, the partition is sent to the corresponding partition according to the partition policy.
- No partition is specified, no partition is defined, but the key is not empty: The default partition is used, and the corresponding partion value is obtained based on the hash value of the key and the number of available partitions in the topic, and sent to the corresponding partition.
- No partition is specified, no partition is defined, and the key is empty: a random int value is obtained for the first time (the value is incremented later), and this value is mod Ed with the number of partitions available to topic to obtain the corresponding partion value, that is, the round-robin algorithm.
4. Message accumulator: After partition selection, messages will not be directly sent, but stored in message accumulator (ConcurrentMap), one batch for each partition. When the batch is full, the sender thread will be woken up to send messages.
Data reliability assurance ACK
Server response policy
- Need to be
More than half
The follower nodes are successfully synchronized so that the client waits less and has lower latency. (This is why it is necessary to deploy an odd number of nodes, otherwise brain splitting may occur) - All the followers nodes need to be synchronized successfully, which takes a long time on the client.
Kafka uses the second +ISR mechanism (incomplete synchronization).
ISR
Kafka’s leader maintains a dynamic set of high-quality slave nodes into which each synchronization guarantees success.
Culling mechanism: The set is removed when data is not synchronized to the leader for a certain period of time. This parameter is determined by replica.lag.time.max.ms. The default value is 30 seconds.
Ack response mechanism
- Acks =0: Producer does not wait for the broker’s ACK. Minimum latency, but broker failures carry the risk of message loss.
- Acks =1 (default) : Producer waits for the ACK of the broker, and partion’s leader returns an ACK after falling successfully. However, if the follower synchronization succeeds and the leader fails, data will be lost.
- Acks =-1 (all) : The producer waits for the ACK of the broker, and the leader and follower of partion return an ACK after all disks have fallen successfully. If the leader fails and fails to send an ACK to the producer before the broker returns an ACK, the producer will resend the message. Set reties to 0 so that it does not repeat.
Conclusion: The three efficiencies decrease successively (the throughput of producer decreases), but the robustness of data increases successively. Analogies to Mysql’s binlog master-slave replication — synchronous, asynchronous, semi-synchronous.
Kafka’s principle of broker storage
Structure of file storage
Configuration file: config/ever.properties logs.dir Configuration: (default) /temp/kafka-logs
Partition partition
To achieve horizontal scaling, different data is stored on different brokers, while reducing the access pressure on a single server, and the data in a topic is divided into multiple partitions. Data in a partittion is ordered, but not globally. The number label after the topic name represents the partition.
A copy of the up
To improve partitioning reliability. When creating a topic, specify replication-factor to determine the number of replicas for the topic. The replica factor must be less than or equal to the number of nodes, otherwise an error will be reported. This ensures that a copy of a partition is kept on the disconnected node, otherwise it loses the meaning of the replica mechanism. A replica is divided into two roles: leader and follower. The leader provides read and write services, while the follower only asynchronously pulls data from the leader.
Q: Why not implement read/write separation like Mysql, with write on the leader node and read on the follower node? A: The design idea is different. If both read and write are on the leader node, there is no problem of inconsistent read and write, which is called monotonic read consistency.
Copy distribution rule
First partition
(partition numbered 0)First copy
Placement locations are randomly selected from the brokersOther partitions
theFirst copy
Position relative toFirst copy
Is moving backwards
Function: Improves the disaster recovery capability. Basically, the first copy of each zone is the leader, and the impact of the first copy is not significant.
segment
A partition is divided into multiple segments (Mysql also has segments, leaf nodes are data segments, and non-leaf nodes are index segments) in order to prevent the file from becoming too large due to the continuous addition of logs, resulting in low message retrieval efficiency. On disk, each segment consists of a log file and two index files.
Log: Within a segment file, logs are appended. If certain conditions are met, the log file is split and a new segment is generated.
- When a segment is fully written (default: 1 gb), use this parameter
log.segment.bytes
Control), creates a new segment with the latest offset name. - The difference between the maximum timestamp of the message and the current system timestamp. Default 168 hours (one week),
log.roll.hours=168
. This means that a new file write will be created a week before the server last wrote data. - Offset index files (.index) or timestamp index files (.timeIndex) must be split if they are of a certain size (default: 10m), otherwise the set of items will not match.
The index
A segment can store a lot of data, and indexing is a mechanism to quickly retrieve messages.
Offset index
Sparse indexes, which generate an index log.index.interval. Bytes every 4kb(default)
Timestamp index
Default is creating message timestamp log. Message. Timestamp. Type = CreateTime
Q: How does Kafka quickly retrieve messages based on indexes? A: Find the segment, find the corresponding position based on the offset, find the corresponding. Log based on the position, find the offset, and compare it with the offset of the message until the message is found.
conclusion
The overall architecture
Kafka message cleaning mechanism
This function can be enabled by default in two ways: 1. Delete directly; 2
Delete directly
1. Delete data based on the time. This function is enabled by default. Delete according to file size, default off, default 1G
Compression strategy
Merges messages with the same key into the last value
Highly available architecture
Leader election
Use ZK to achieve the election, the uniqueness of ZK nodes, watch mechanism, temporary nodes.
Steps:
- Select a Controller(Controller) by using the uniqueness of ZK nodes, and select again after the Controller hangs.
- The COntroller selects copies from the ISR collection as candidates. (ISR may be null, if you need to prevent this from happening allow non-ISR also eligible, but will cause data loss, not recommended).
- By default, the first replica in the ISR is selected as the leader, and others are selected as the leader.
Master-slave synchronization
LEO(Log End Offset): the Offset of the next message to be written. High Watermark (HW): The smallest LEO in the ISR. A consumer can only consume up to the location before the HW, meaning messages that have not been completely synchronized cannot be consumed. If the synchronization is successful and consumed, the consumer group’s offset will be too large. If the leader crashes, messages will be lost.
How do slave nodes synchronize with master nodes?
- The follower node sends a FETCH request to the leader node, which sends data to the follower node.
- Followers receive stored data and update their LEO.
- The Leader updates the HW(the smallest LEO in the ISR).
Followers of failure
- Will be kicked out of the ISR
- After recovery, cut off the part that is higher than HW and start synchronization again.
- Join THE ISR after catching the Leader for 30 seconds
Leader failure
- Replica1 is selected as the leader
- The remaining copies above HW will be deleted and resynchronized
This mechanism can only guarantee the data consistency of the copy directly, but cannot guarantee the data loss or duplication.
Kafka consumer principle
For a partition, the consumer group will continue to consume according to the last consumption offset. If it is a new group with no offset, it will consume from the latest message.
Offset of the storage
Offset is stored in a particular topic of the broker.
The renewal of the offset
The default is every 5 seconds. Updates can be submitted automatically or manually
Kafka consumer spending strategy
A consumption group corresponds to all partitions of a topic. If the number of consumers in the group is greater than the number of partitions, the extra consumers cannot be consumed.
The number of consumers is less than the number of partitions.
- Range: Part for each consumer
- Round Robin: Each person is assigned a partition.
- Stiky: Sticky, self baidu
This can also be specified manually.
Principle:
- Partition as evenly as possible.
- The result is the same as last time.
Rebalance partition reallocation
- The number of consumers has changed
- The number of partitions has changed
How to increase consumer consumption speed?
- Increase the number of zones, try to ensure that the number of consumers and the number of zones 1:1.
- The code side improves processing speed on the consumption side.
- Increase the number of messages that consumers pull at a time; Enable message compression (ensure that the broker and producer use an algorithm).
- If message consistency is not considered, override the partition to distribute messages as evenly as possible.
Why is Kafka so fast
- Sequential read and write +PageCache: Kafka is written to memory first, and the operating system brushes the disk to write files (this process may cause message loss). Message is appended to the end of the file, not written randomly.
- Indexes :offset and timestamp indexes
- Batch read/write and file compression: message accumulator reduces network IO
- Zero copy: DMA, direct memory access.
Kafka messages do not lose configuration
- The producer uses the send method that brings back calls, and can process messages according to their loss.
- Acks =all: ensures that all followers are successfully dropped.
- Reties sets a large value: multiple retries.
- Unclean. Leader. Election. Enable = false: off by default, does not allow the isr collection of followers to participate in the election.
- Increase the number of copies, >=3
- Set min.insync.replics > 1: The broker side parameter to control the minimum number of copies in the ISR and the number of copies written to the message before it is committed. Recommend replication. Factor = min. Insync. Replics + 1
- Ensure message consumption completes before submission. The consumer enable.auto.mit is best set to false and controls the submission of updates to offset itself.