This paper analyzes how Kafka achieves high performance from the macro architecture level and micro implementation level.
It covers how Kafka uses partitions for parallel processing and horizontal scaling, dynamic balancing of availability and data consistency through ISR, zero copy using NIO and SendFile for Linux, and efficient disk utilization through sequential reads and writes and data compression.
Kafka is a pub-sub messaging system, either publish or subscribe, must specify Topic. Topic is just a logical concept, as explained in Kafka Design Analysis (1) – Kafka Background and Architecture introduction.
Each Topic contains one or more partitions, which can be located on different nodes. A Partition corresponds to a local folder physically. Each Partition contains one or more segments. Each Segment contains a data file and an index file. Logically, a Partition can be thought of as a very long array whose data can be accessed through its offset index.
On the one hand, since different partitions can be located on different machines, clustering can be fully utilized to achieve parallel processing between machines. On the other hand, Partition corresponds to a folder physically. Even if multiple partitions are located on the same node, you can configure different partitions on the same node on different disk drives to implement parallel processing among disks and make full use of the advantages of multiple disks.
To take advantage of multiple disks, mount different disks to different directories and, in server.properties, set log.dirs to multiple directories (separated by commas). Kafka automatically allocates all partitions as evenly as possible to different directories (i.e., disks).
Note: Although the smallest physical unit is a Segment, Kafka does not provide parallel processing between different segments within the same Partition. For writes, only one Segment within the Partition is written at a time. For reads, only different segments within the same Partition are read sequentially.
As described in Kafka Consumer Design (4), when multiple consumers consume the same Topic, the same message is consumed by only one Consumer in the same Consumer Group. Data is not allocated on a message basis, but on a Partition basis, meaning that data on the same Partition will only be consumed by one Consumer (without considering Rebalance).
If the number of consumers exceeds the number of partitions, some consumers cannot consume any data of this Topic. In other words, when the number of consumers exceeds the number of partitions, increasing the number of consumers does not increase the parallelism.
In short, the number of partitions determines the maximum parallelism possible. As shown in the figure below, since Topic 2 contains only three partitions, Consumer 3, Consumer 4, and Consumer 5 in group2 can consume data of one Partition respectively, while Consumer 6 does not consume any data of Topic 2.
Kafka high performance Architecture, Kafka how to achieve high performance?
Take The Kafka data consumed by Spark as an example. If the number of partitions consumed by Spark is N, the maximum parallelism of Spark is also N. Even if the number of Spark executors is set to N+M, a maximum of N executors can process data for the Topic at the same time.
CAP theory states that in distributed systems, consistency, availability, and partition tolerance can only be satisfied at most two at the same time.
consistency
availability
Zonal tolerance
In general, partition tolerance is required. So under CAP theory, there is more of a trade-off between availability and consistency.
Master-Slave
WNR
Paxos and its variants
Isr-based data replication Solutions As described in Kafka High Availability (Part 1), data replication in Kafka is based on partitions. The followers pull data from the Leader to replicate data between multiple backups. In this respect, Kafka’s data replication scheme is similar to the master-slave scheme described above. The difference is that Kafka is neither synchronous nor asynchronous replication, but isR-based dynamic replication.
ISR, or in-sync Replica. The Leader of each Partition maintains a list of all replicas synchronized with it (including the Leader himself). Every time data is written, only when all replicas in the ISR complete, the Leader sets it as a Commit, and it can be consumed by consumers.
This scheme is very close to synchronous replication. But the difference is that this ISR is dynamically maintained by the Leader. If a Follower fails to “keep up” with the Leader, it is removed from the ISR by the Leader. After it “keeps up” with the Leader again, the Leader adds it to the ISR again. After each ISR change, the Leader persists the latest ISR to Zookeeper.
Different versions of Kafka have slightly different strategies for determining whether a Follower is “keeping up” with the Leader.
For the replica.lag.max. Messages parameter of 0.8.*, many readers have left messages asking why there is a large gap between followers and leaders since only the messages copied by all replicas in ISR are considered Commit. The reason is that the Leader does not need to wait for the previous message to be committed before receiving the later message. In fact, the Leader can receive a large number of messages in order, and the Offset of the latest message is denoted as LEO (Log end Offset). Only messages copied by all followers in the ISR are committed. A Consumer can consume only the message that has been committed. The latest Commit Offset is recorded as a High watermark. In other words, LEO marks the offset of the latest message saved by the Leader, while the High watermark marks the latest Follower messages that can be consumed (synchronized to the ISR). However, the Leader receives data asynchronously and the followers copies data asynchronously, so there is a certain gap between the Hight watermark and LEO. In 0.8.* version, replica.lag.max.messages limits the maximum value of the gap allowed by the Leader.
The following figure shows the principles of Kafka isR-based data replication.
Kafka high performance Architecture, Kafka how to achieve high performance?
As shown in the figure above, in the first step, Leader A received 3 messages in total, so its high watermark was 3. However, since followers only synchronized the first message (M1) in ISR, only M1 was committed, that is, only M1 could be consumed by consumers. At this time, the gap between Follower B and Leader A is 1, while the gap between Follower C and Leader A is 2, both of which are less than the default replica.lag.max.messages, so they are retained in the ISR. In the second step, as the old Leader A breaks down and the new Leader B does not receive A Fetch request from A within replica.lag.time.max.ms, A is removed from the ISR, and then ISR={B, C}. At the same time, because there are only two messages in the new Leader B at this time, m3 is not included (M3 has never been committed by any Leader), so M3 cannot be consumed by consumers. In step 4, Follower A returns to normal. It deletes all uncommitted messages before the outage and starts chasing the new Leader FROM the last committed message until it “catches up” with the new Leader and is re-added to the new ISR.
According to “Sequential Disk Writing Is Faster than Random Memory Writing in Some Scenarios”, the sequential disk writing process can greatly improve disk utilization.
In Kafka’s design, a Partition is a very long array into which all messages received by the Broker are sequentially written. At the same time, the Consumer uses Offset to consume the data sequentially without deleting the data already consumed, thus avoiding the process of random disk writes.
Due to the limited number of disks, it is impossible to store all data, and in fact Kafka as a messaging system does not need to store all data. Old data needs to be deleted. Instead of modifying a file in read-write mode, the Partition is divided into multiple segments, each corresponding to a physical file, and the data in the Partition is deleted by deleting the entire file. This eliminates old data and avoids random writes to files.
Kafka deletes a Segment by deleting the entire log file and index file. Kafka deletes a Segment by deleting the entire log file and index file.
Kafka high performance Architecture, Kafka how to achieve high performance?
The benefits of using Page Cache are as follows
When the Broker receives data, it writes the data to the Page Cache. There is no guarantee that the data will be written to disk. From this point of view, it is possible that the machine will go down and the data in the Page Cache will not be written to disk, resulting in data loss. However, this loss only occurs when the operating system does not work due to machine power outages, which can be resolved by the Replication mechanism at the Kafka level. Forcing data from the Page Cache to disk to ensure that data is not lost in this case can degrade performance. Because of this, Kafka does not recommend using flush. Messages and flush. Ms parameters to forcibly flush data from the Page Cache to disk.
If the data consumption rate is comparable to the production rate, the data does not even need to be exchanged through the physical disk, but directly through the Page Cache. Meanwhile, the followers Fetch data from the Leader through the Page Cache. The following figure shows the network/disk read and write information of the Leader node of a Partition.
Kafka high performance Architecture, Kafka how to achieve high performance?
As you can see from the figure above, the Broker receives about 35MB of data from Producer over the network every second. Although followers Fetch data from the Broker, the Broker basically does not read disks. This is because the Broker fetches data directly from the Page Cache and returns it to the followers.
Broker log.dirs configuration item that allows multiple folders to be configured. If you have multiple Disk drives on your machine, you can mount different disks to different directories, and then configure these directories in log.dirs. Kafka takes advantage of multiple disks by allocating partitions to as many directories, or disks, as possible.
In Kafka, there are a lot of network data persisting from Producer to Broker and disk files being sent from Broker to Consumer over the network. The performance of this process directly affects Kafka’s overall throughput.
Take sending disk files over the network as an example. In traditional mode, the file data is read into the memory and then sent out through the Socket, as shown in the following pseudocode.
This process actually takes place four times. The file data is first read into the kernel-state Buffer (DMA copy) through a system call. The application then reads the memory-state Buffer data into the user-state Buffer (CPU copy). The user program then copies the user-mode Buffer data to the kernel-mode Buffer (CPU copy) when sending data through the Socket, and finally copies the data to the NIC Buffer through DMA copy. This is accompanied by four context switches, as shown in the figure below.
Kafka high performance Architecture, Kafka how to achieve high performance?
The Linux 2.4+ kernel provides zero copy via the SendFile system call. After the data is DMA copied to the kernel Buffer, the data is directly DMA copied to the NIC Buffer without CPU copying. This is where the term zero copy comes from. In addition to reducing data copying, performance is greatly improved because the entire read-file-network send is done by a single sendFile call with only two context switches. The zero-copy process is shown in the following figure.
Kafka high performance Architecture, Kafka how to achieve high performance?
In terms of implementation, Kafka transfers data through TransportLayer, and its subclass PlaintextTransportLayer is implemented through the FileChannel transferTo and transferFrom methods of Java NIO
Note: transferTo and transferFrom do not guarantee the use of zero copy. Whether or not you can actually use zero-copy depends on the operating system. If the operating system provides a zero-copy system call like SendFile, then the two methods will take full advantage of zero-copy through such a system call, otherwise they cannot implement zero-copy by themselves.
Batch processing is a common way to improve I/O performance. For Kafka, batch processing reduces Overhead over the network and improves disk write efficiency.
Kafka 0.8.1 and previous producers distinguish between synchronous and asynchronous producers. Synchronous Producer send methods can be divided into two forms. One is to take a KeyedMessage as an argument and send a message one at a time. The other is to take a batch of KeyedMessages as arguments and send multiple messages at once. For asynchronous sending, no matter which send method is used, the message is not immediately sent to the Broker. Instead, the message is stored in an internal queue until the number of messages reaches the threshold or reaches the specified Timeout. In this way, the message is sent in batches.
Kafka 0.8.2 began supporting a new Producer API that combines synchronous Producer with asynchronous Producer. Although the SEND interface can only send one ProducerRecord at a time and cannot accept a list of messages like the previous version of SEND, send does not send messages immediately, but controls the actual sending frequency through batch.size and Linger.ms. So as to realize batch sending.
There is a lot of Overhead (called Overhead) that needs to be transmitted during each network transmission, so multiple messages are combined to reduce Overhead and improve transmission efficiency.
As you can see from the figure in the zero-copy section, although the Broker continues to receive data from the network, disk writes do not occur every second, but at intervals of very high volumes (up to 718MB/S).
Kafka starts with 0.7, which allows data to be compressed and transmitted to brokers. In addition to being able to compress and transmit each message individually, Kafka also supports the ability to compress and transmit the entire Batch of messages together when sending in batches. One of the basic principles of data compression is that the more repeated the data, the better the compression. Therefore, the compression of the whole Batch of data can greatly reduce the amount of data and improve network transmission efficiency to a greater extent.
After receiving a message, the Broker does not directly decompress it. Instead, the message is persisted to disk in a compressed form. The Consumer Fetch retrieves the data and decompresses it. Therefore, Kafka compression not only reduces the network transmission load from Producer to Broker, but also reduces the disk operation load of Broker, and reduces the network transmission volume between Consumer and Broker, thus greatly improving transmission efficiency and throughput.
The types of the Key and Payload (or Value) of a Kafka message can be customized by providing both serializers and deserializers. Therefore, users can increase throughput by using fast and compact serialization-deserialization methods (e.g. Avro, Protocal Buffer) to reduce the size of actual network transfers and disk storage data. It is important to note that if the serialization method used is too slow, even a very high compression ratio may not end up being as efficient.