preface
This article first introduces what Kafka is, and kafka’s advantages, daily use scenarios, as well as some basic concepts of Kafka, these are the most basic, but also beginners need to understand and master. Next, take a look at the overall architecture of Kafka, including the workflow storage mechanism. Then take a look at the details of what Kafka does from a producer’s perspective and a consumer’s perspective.
Kafka is introduced
Kafka was originally developed by Linkedin. Kafka is a distributed, partitioned, multi-copy, ZooKeeper-based distributed message queue that can process large amounts of data in real time to meet various requirements. Kafka is written in Scala.
features
- 1. High throughput, low latency: Kafka can process hundreds of thousands of messages per second with latency as low as a few milliseconds. Each topic can be divided into multiple partitions in parallel, and the Consumer group can consume the partitions.
- 2. Scalable and scalable: Kafka clusters support hot scaling.
- 3, persistence, reliability: producers use ACK +ISR mechanism to ensure that messages are not lost, and Exactly Once semantics can ensure that messages are consumed only Once.
- 4, fault tolerance: partition multi-copy election mechanism can ensure that a machine down does not affect the overall function, because generally, multiple copies of a partition are distributed on different machine nodes.
- 5, high concurrency: support thousands of clients reading and writing at the same time.
Publish and subscribe model
Kafka is based on a publish/subscribe model. Producers publish messages to a Topic, multiple consumers subscribe to the Topic, and messages published to a Topic are consumed by all subscribers. The consumed data is not immediately purged from the Topic.
The basic concept
- Producer: A message Producer, a client that sends messages to Kafka Broker
- Consumer: Message Consumer, the client that fetches messages from Kafka Broker.
- Consumer Group: a Consumer Group in which each Consumer is responsible for consuming data from different regions to improve consumption ability. A zone can only be consumed by one consumer in a group. Consumer groups do not affect each other. All consumers belong to a consumer group, that is, a consumer group is logically a subscriber.
- Broker: A Kafka machine is a Broker. A cluster consists of multiple brokers, and one Broker can hold multiple topics.
- Topic: a Topic, which can be understood as a queue, categorizes messages, and producers and consumers are oriented towards a Topic
- Partition: A Topic can be divided into multiple partitions, which are distributed among different brokers, and each Partition is an ordered queue.
- Up: Replicas. To back up data, Kafka provides replicas. Each Partition in a Topic has several replicas, one Leader, and one Follower.
- Leader: The “master” copy of multiple copies of each Partition, the object that the producer sends data to, and the object that the consumer consumes data to.
- Follower: Synchronizes data with the Leader in real time. If the Leader fails, a Follower is elected as the new Leader.
- Offset: the location information of consumer consumption. It monitors the location of data consumption. When the consumer hangs up and recovers, it can continue consumption from the last consumption location.
- Zookeeper: Helps Kafka store and manage cluster information.
architecture
Kafka stores messages from multiple Producer processes. The data is then stored in different partitions under different topics. Within one Partition, the messages are indexed and timestamped, and other processes, called Consumer, can pull messages from the partitions.
The storage mechanism
Since messages generated by producers are constantly appended to the end of log files, kafka implements fragmentation and indexing to prevent inefficient data search due to large log files. Each Partition is divided into multiple segments. Each Segment corresponds to two files: .index index files and.log data files (actually four files) are located in the same folder named topic name-partition number. For example, the first topic has three partitions, and the corresponding folders are first-0, first-1, and first-2.
ls /root/data/kafka/first-0
00000000000000009014.index
00000000000000009014.log
00000000000000009014.timeindex
00000000000000009014.snapshot
leader-epoch-checkpoint
Copy the code
As shown above, the index and log files are named with the Offset of the first message of the current Segment. The following figure shows the structure of index and log files
producers
We’ll look at how and why Producer partitions are performed, and how producers ensure that messages sent to the specified partition are not lost.
Partitioning strategies
Partition reason:
- To facilitate scaling within a cluster, a topic is a logical concept and a partition is a physical message queue. Adding or deleting a topic is equivalent to adding or deleting multiple partitions. Reading and writing to a topic is equivalent to reading and writing to a partition.
- Concurrency can be improved. Reading and writing to a topic is equivalent to reading and writing to multiple partitons in parallel.
The principle of partitioning: we need to encapsulate the sent data into a ProducerRecord object, which needs to specify some parameters: Topic is mandatory, others such as partition, TIMESTAMP, key, value, headers are optional. So how does the producer know which Partiton to send to?
- When specifying a Partition, the specified value is directly used as the Partition value
- If no Partition is specified but a key is present, mod the hash value of the key to the number of partitions to obtain the Partition value.
- If the Partition does not have a Key, a random integer is generated during the first call (the integer is incremented in each subsequent call). Mod this number with the number of available partitions to obtain the Partition value.
Data reliability assurance
To ensure that the data sent by Producer can be reliably sent to the specified Partition, the Partition receives the data and sends an ACK to the Producer. If the Producer receives an ACK, the data will be sent in the next round. Otherwise, the data will be sent again.
ISR mechanism
The ISR list is automatically maintained for each Partition. This list must contain the Leader and the followers who keep their data synchronized with the Leader. This means that as long as a Follower keeps its data synchronized with the Leader, It’s going to be in the ISR. If the Follower has some problems and cannot synchronize data from the Leader, the Follower is considered “out-of-sync” and removed from the ISR list.
ACK response mechanism
Kafka provides three reliability levels for different scenarios. Users can choose a proper configuration based on reliability and latency requirements.
The ACK parameter has three configurations:
- 0: The Producer does not wait for the Broker’s ACK, which is the lowest delay. Once the Broker receives data and returns it before it is written to disk, it may lose data if the Broker fails.
- 1: The Producer waits for the Broker’s ACK, and the Partition’s Leader returns an ACK if the write is successful. If the Leader fails before the Follower synchronization succeeds, data will be lost.
- -1 (all) : The Producer waits for the ACK from the Broker and the Leader and Follower of the Partition to write successfully before returning an ACK. However, when the Broker sends an ACK, the Leader fails, causing data duplication.
Duplicate data synchronization policy
Partition is the synchronization of replica data to ensure that data is not lost when it is sent from producer to Broker. Ensure that the Follower synchronizes with the Leader successfully, and the Leader sends an ACK reply. In this way, a new Leader can be elected from the followers without data loss after the Leader fails. There are two schemes for data synchronization, each with advantages and disadvantages:
- An ACK is sent when more than half of the followers complete synchronization. The advantage is that the delay is low. The disadvantage is that n node failures are tolerated when a new Leader is elected, requiring at least 2N +1 copies.
- ACK is sent only after all followers complete synchronization. The advantage is that when a new Leader is elected, n node failures are tolerated and only N +1 copies are required. The disadvantage is high latency.
Troubleshooting Details
First, explain the two offsets:
- LEO: The largest Offset in each copy.
- HW: The largest Offset seen by the consumer, which is the smallest LEO in the ISR queue.
If a Follower fails, the Follower is temporarily removed from the ISR collection. After the Follower fault recovers, the Follower reads the last HW recorded on the local disk, intercepts the higher HW in the log file, and synchronizes the latest data from the HW to the Leader. When the Leader fails, a Follower is selected from the ISR set as the new Leader. To ensure data consistency among multiple copies, the rest of the followers cut off the log files whose values are higher than HW and then synchronize data with the new Leader.
These measures are to ensure data consistency between multiple replicas.
Exactly Once semantic
What’s the difference between “At least Once”, “At most Once” and “Exactly Once”
- At least once. Setting the ACK level of the server to -1 ensures that data from Producer can be sent successfully at least once. Because even if the sending fails, the sending will continue to retry until the sending succeeds. Messages are guaranteed not to be lost, but they may be duplicated.
- At most once. Setting the ACK level of the server to 0 ensures that the Producer sends messages only once and never again, regardless of success. Messages are guaranteed not to duplicate, but they can be lost.
- -Dan: Exactly Once. Exactly Once = At Least Once+ idempotent means that no matter how many repeated messages Producer sends to the Broker, the Broker will persist only one message.
To enable idempotency, only set the Producer parameter Enable. idompotence to true. When a Producer with the idempotency enabled is initialized, it will be assigned a PID and Sequence numbers will be attached to messages sent to the same Partition. The Broker will cache <PID,Partition,SegNumber> and persist only one message that has the same primary key. However, PID changes after restart, and different partitions have different primary keys, so idempotent cannot guarantee Exactly Once across Partition sessions.
consumers
Consumption patterns
There are generally two consumption modes of message queues: Pull and Push.
- The Consumer uses the Pull mode to read data from the Broker.
- Consumers adopt the Push mode. The rate at which the Broker pushes messages to consumers is determined by the Broker, which is difficult to adapt to consumers with different consumption rates.
Push is the fastest way to deliver messages, and it is easy for consumers to fail to process messages, typically through denial of service and network congestion. The pull pattern can consume messages at an appropriate rate based on the Consumer’s consumption power. The disadvantage of the Pull pattern is that if Kafka has no data, the Consumer may be stuck in a loop and never pull the data.
consumption
A subscription Topic is subscribed to by a consumer group. A consumer group can have multiple consumers, but a Partition can only be consumed by one consumer in the group. However, a Partition can be consumed by multiple consumer groups at the same time. Therefore, if consumers in a consumer group are >Partition, individual consumers may be idle and not allocated to Partition consumption. However, if the consumers in a consumption group are <Partition, there will be consumers assigned to multiple partitons, which involves the problem of allocation.
Partition allocation policy
A Consumer Group has multiple consumers and a Topic has multiple partitions, so Partition allocation is inevitably involved, that is, determining which Partition is consumed by which Consumer. Kafka has two allocation policies: RoundRobin and Range. The default is Range. Partition reallocation is triggered when consumers in a consumer group change.
RoundRobin
RoundRobin Hash sorts all partitions as a whole. The difference in the number of allocated data within a consumer group is at most 1. The RoundRobin Hash sorting method solves the problem of unbalanced consumption data of multiple consumers. However, when subscribing to different topics within a consumer group, it can cause consumer confusion. As shown in the figure below, Consumer0 subscribes to topic A and Consumer1 subscribes to topic B.
As shown in the figure above, after sorting the partitions of topic A and TopicB and assigning them to the consumer group, the data in the TopicB partition may be assigned to Consumer0.
Range
Range mode is divided according to the topic, will not produce polling mode of consumption chaos problem. However, messages can be misallocated. For example, in the figure below, Consumer0 and Consumer1 subscribe to both topic A and topic B. Consumer0 is assigned 4 partitions and Consumer1 2 partitions.
The preservation of the Offset
The consumption Partition of a Consumer group needs to save the Offset record, which used to be saved in Zookeeper. Due to the poor write performance of Zookeeper, the previous solution is that consumers report to Zookeeper every minute. In this case, the performance of Zookeeper seriously affects the consumption speed. And it is easy to repeat consumption. After version 0.10, Kafka saved offsets in a topic called the ConsumerOffsets topic. The Key written into the message consists of Groupid, Topic, and Partition. The Value is Offset. The cleanup strategy for the Topic configuration is Compact, always keeping the latest Key and deleting the rest. Generally, the Offset of each key is cached in memory, and the Partition is not traversed during query. If there is no cache, the Partition is traversed the first time to build the cache, and the query returns. Determine the Partition to which the Consumer Group shift information is written to the consumer_offsets, using the following calculation formula:
__consumers_offsets partition = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount) / / groupMetadataTopicPartitionCount by offsets. Topic. Num. Partitions specified, the default is 50 partition.Copy the code
Rebalance
Partitions are assigned to the Broker during production and to consumers during consumption. Similar to selecting a Controller from a Broker, a consumer needs to select a Coordinator from the Broker to allocate partitions.
Choose the Coordinator
It depends on which Partition the Offset resides: The Broker where the Partition Leader resides is the Coordinator. It can be seen that the Coordinator of the Consumer Group is the same machine as the Partition Leader where the Consumer Group Offset is stored.
Allocation process
When coordinators are selected, they are assigned:
- When a consumer is started or a Coordinator is down, the consumer requests any Broker to send a ConsumerMetadataRequest request. The Broker selects the Coordinator address for the Consumer as described above.
- A Consumer sends a heartbeat request to a Coordinator. If the Coordinator returns illegalGenration, the Consumer’s information is old and needs to be rejoined to perform Rebalance. Return success, and the Consumer continues execution from the last Partition allocated.
Rebalance process
Rebalance when the number of partitions or consumers changes.
- The Consumer sends a JoinGroup Quest request to the Coordinator.
- When another Consumer sends a heartbeat request, the Coordinator tells them to Rebalance.
- Other consumers also send joinGroup quest requests.
- After all registered consumers have sent a JoinGroupRequest request, the Coordinator can select one of the consumers as the Leader and reply to the JoinGroupResponse. It will tell the Consumer whether you are a Follower or a Leader. It will also give him information about followers and ask him to allocate partitions according to this information.
- The Consumer sends a SyncGroupRequest to the Coordinator, where the Leader’s SyncGroupRequest contains the allocation.
- The Coordinator packets back and tells the Consumer, including the Leader, about the assignment.
Kafka’s high performance
Kakfa’s high throughput and low latency and what we call Kafka’s fast performance are mainly reflected in reading and writing. Specifically, it depends on the following aspects:
- Use Partition to achieve parallel processing
- Sequential disk write
- Make full use of PageCache
- Zero copy technology
- Batch processing and data compression
Use Partition to achieve parallel processing
We know that Kakfa is a publish-subscribe messaging system, and whether you publish or subscribe, you specify a Topic, and each Topic contains one or more partitions, which can be located on different nodes. On the one hand, since different partitions may be located on different machines, the advantages of clustering can be fully utilized to realize parallel processing among machines. On the other hand, Partition corresponds to a folder physically. Even if multiple partitions are located on the same node, you can configure different partitions on the same node to implement parallel processing among disks.
The key factor affecting disks is disk service time, which is the time it takes for a disk to complete an I/O request. It consists of seek time, rotation delay, and data transfer time. Mechanical keyboards have good continuous read/write performance, but poor random read/write performance, mainly because it takes time for the head to move to the right track. Random read/write requires the head to move constantly, and time is wasted in head addressing, so performance is poor.
Sequential write disk
In Kafka, each Partition is an ordered, immutable sequence of messages, with new messages appended to the end of the Partition. This is called sequential writes. Due to the limited number of disks, it is impossible to store all data, and in fact Kafka as a messaging system does not need to store all data; old data needs to be deleted. The Partition is divided into multiple segments, each of which corresponds to a physical file. Kafka removes data from the Partition by deleting the entire file. This way of cleaning up data also avoids random writes to files.
Make full use of Page Cache
The Cache layer is introduced to improve the performance of the Linux operating system on disk access. The Cache layer caches some data on disks in memory. When a data request arrives, if the data exists in the Cache and is the latest, the data is directly transferred to the application program, eliminating operations on the underlying disks and improving performance. The Cache layer is one of the main reasons why disk IOPS exceeds 200. In Linux, the file Cache is divided into two layers: Page Cache and Buffer Cache. Each Page Cache contains several Buffer caches. Page Cache is mainly used as the Cache of file data on the file system. The Buffer Cache is designed for use by systems that Cache data from blocks while the system reads or writes to them.
Page Cache has the following benefits:
- IO Scheduler improves performance by assembling contiguous small blocks of writes into large physical writes.
- IO Scheduler tries to reorder some writes to reduce disk head movement time.
- Make full use of all free memory (non-JVM memory)
- Read operations can be performed directly in the Page Cache. If the consumption and production rates are comparable, data does not even need to be exchanged through physical disks (directly through the Page Cache).
- If the process is restarted, the Cache in the JVM is invalidated, but the Page Cache is still available.
After the Broker receives data, it writes data to the Page Cache. There is no guarantee that the data will be written to disk. Although data may be lost when the machine is down, such loss only occurs when the machine is powered off and the operating system does not work. Such a scenario can be solved by the Replica mechanism.
Zero copy technology
In Kafka, a large amount of network data is persisted from Producer to Broker and disk files are sent over the network from Broker to Consumer. The performance of these two processes directly affects Kafka’s overall throughput. The core of the operating system is the kernel, which is independent of ordinary applications and has access to the protected memory space as well as access to the underlying hardware devices. To prevent user processes from directly operating the kernel and ensure kernel security, the operating system divides virtual memory into two parts, one is kernel space and the other is user space.
In traditional Linux, standard I/O interfaces are based on data copy operations, that is, I/O operations cause data to be copied between the buffer of the kernel address space and the buffer of the user address space. This has the advantage of reducing the actual IO operations if the requested data is already in the kernel’s cache, but has the disadvantage of incurring CPU overhead during the data copying process.
Network data is persisted to disk
In traditional mode, transferring data from the network to a file requires four data copies, four context switches, and two system calls.
Data = socket.read()// Read network data File File = new File() file.write(data)// Persist to disk file.flush()Copy the code
This process actually takes place four times:
- The network data is first copied to the kernel-formatted Socket Buffer via DMA copy.
- The application then reads the kernel-state Buffer data into user state (CPU copy)
- The user program then copies the user Buffer to the kernel (CPU copy)
- Finally, the data is copied to a disk file via DMA copy.
DMA: Direct memory access. DMA is a hardware mechanism that allows two-way data transfer between peripherals and system memory without the CPU’s involvement. Using DMA can take the system CPU out of the actual IO data transfer process, thus greatly improving the system throughput.)
Data falls are usually non-real-time, and Page Cache technology is used to improve I/O efficiency. For Kafka, the data produced by Producer is stored in the Broker. The process of reading the network data from the socket buffer can be written in the kernel space. There is no need to read the network data from the socket buffer into the application process buffer. In this scenario, the application process receives network data from the Socket buffer without intermediate processing and can use mMAP memory file mapping. Short for mmap, also known as MMFile, mmap is used to map the addresses of read buffers in the kernel to user-space buffers. Thus realize the kernel buffer and the application program memory sharing, eliminating the process of copying data from the kernel read buffer to the user buffer, the working principle is to directly use the operating system Page to achieve the file to the physical memory mapping. After the mapping is complete, your operations on physical memory are synchronized to disk. You can get a big IO boost in this way, eliminating the overhead of copying from user space to kernel space. Mmap also has an obvious drawback: it is unreliable. Data written to Mmap is not actually written to disk, and the operating system does not write data to disk until the program initiates a Flush call.
Zero-copy technology means that the CPU does not have to first copy data from one memory region to another when performing an operation on the computer, thus reducing context switching and CPU copy time. Its function is in the process of data transfer from network devices to user program space, reduce the number of data copy, reduce system call, achieve zero CPU participation, completely eliminate the CPU in this aspect of the load. There are three main types of zero-copy technology:
- Direct IO: Data is passed across the kernel between the user’s address space and the IO device, and the kernel does the necessary auxiliary tasks such as configuring virtual storage.
- Avoid data copying between kernel and user space: Mmap, SendFile, Splice && Tee, sockMap can be avoided when the application does not need to access the data.
- Copy on Write: Copy on write technology. Data is partially copied when it needs to be modified rather than copied in advance.
Disk files are sent over the network (Broker to Consumer)
Traditional way: read the disk first, then send socket, actually is four times Copy.
buffer = File.read
Socket.send(buffer)
Copy the code
This process is analogous to the production message above
- First read the file data into the kernel-formatted Buffer (DMA copy) through a system call
- The application then reads the kernel-state Buffer data into the user-state Buffer (CPU copy)
- The user Buffer is then copied to the kernel Buffer (CPU copy) when the user application sends data over the Socket.
- Finally, the data is copied to the NIC Buffer via DMA copy.
The Linux 2.4+ kernel provides zero copy via the SendFile system call. After the data is DMA copied to the kernel Buffer, it is directly DMA copied to the NIC Buffer without CPU copying, which is where the term zero copy comes from. In addition to reducing data copying, the entire file reading and network sending is done by a single sendFile call with only two context switches, thus greatly improving performance.
Kafka’s solution here is to use NIO’s transferTo/transferFrom to call the operating system’s sendfile for zero copy. A total of two kernel data copies, two context switches, and one system call occurred, eliminating CPU data copies.
Batch processing and data compression
In many cases, the bottleneck is not CPU or disk, but network IO. Thus, in addition to low-level batch processing provided by the operating system, Kafka clients and brokers accumulate multiple records (both read and write) in a batch before sending data over the network. Recorded batching spreads the cost of network round-trips, using larger packets and improving bandwidth utilization. Producer can compress data and send it to the Broker to reduce the cost of network transmission. Currently, the supported compression algorithms include Snappy, Gzip, and LZ4.
conclusion
At this point, we have an idea of the core of Kakfa’s design. How do you achieve high reliability, high performance, and high throughput? Make it possible to Rebalance a Partition — segment-log /index — make it possible to Rebalance a Partition — make it possible to Rebalance a Partition — Kafka provides high performance and high throughput in five designs: multi-partition parallel processing, sequential write to disk, full utilization of Page Cache, zero-copy technology, batch processing, and data compression. Did you learn?