The introduction

Message Queue is an important component in distributed system. When the performance or speed of producers and consumers is unbalanced, a buffer layer should be added between producers and consumers to ensure performance and reliability and prevent them from influencing each other. This buffer layer is message queue. Message queues have the following functions:

  • Asynchrony: Asynchrony logic that does not require real-time returns improves overall interface performance
  • Decoupling: There is no direct dependence between producers and consumers, and the failure of one system does not affect the other, ensuring the stability and robustness of the system
  • Peak clipping: Message queues can act as a layer of buffer to smooth out burst traffic and prevent system availability from being affected by a short period of high load
  • Sequential guarantee: In many cases, the order of the data is important, and message queues are mostly ordered to ensure that the data is processed in a particular order
  • Reliability: Message queues can provide persistence and, if necessary, re-consumption of historical messages.

There are many message queue middleware on the market, such as RabbitMQ, Kafka, RocketMQ, etc., each has its own application scenarios. To put it simply, RabbitMQ is primarily used for financial payments with high reliability and availability, low latency (subtlety), and a correspondingly small single-machine throughput (ten thousand levels). Kafka was originally designed for the logging world, with high throughput (100,000 or even millions) and relatively high latency (milliseconds). RocketMQ is open sourced by Alibaba and is based on the idea of Kafka. It can support a larger number of topics on the same machine while maintaining the high throughput of Kafka. It is widely used within Alibaba.

This article focuses on Kafka to help you solve the following problems:

  1. How is Kafka’s architecture designed?
  2. Kafka throughput is so high, how is it guaranteed, what are the optimizations?
  3. How is Kafka sequentiality guaranteed? Can it guarantee strict global ordering?
  4. What is the pull mode for Kafka messages, and why was it chosen?
  5. What are the message semantics of Kafka?
  6. How does Kafka ensure high availability?

1. Kafka topology

1.1 Related Concepts

  • Producer: Indicates the Producer of messages

  • Consumer: A Consumer of information

  • Broker: An instance of a Kafka cluster where a producer writes messages to a Broker store and a consumer pulls messages from the Broker and processes them

  • Topic: Each message sent to Kafka has a corresponding category, called Topic, which logically distinguishes different messages

  • Partition (Partition) : Physical Partition of a message. A Topic can correspond to one or more partitions. Each Partition is an ordered queue, and new messages are appended to the end of the Partition. Each consumer corresponds to an offset of a partition, indicating the location of consumption

  • Consumer Group: A Group of consumers. Each Consumer belongs to a specific Consumer Group. Different consumers in the same Consumer Group can consume the partition messages in a Topic in parallel

1.2 Introduction to Kafka architecture

A Kafka architecture consists of Producer (server logs, front-end buried points, business data, database change data, etc.), Kafka clusters of brokers, consumers (downstream processes, business monitoring, log analysis, etc.). Kafka cluster metadata and consumer offset information are stored in ZK (Zookeeper). Producer, Broker, and Consumer are all distributed without any single point and can be easily scaled horizontally.

2. How does Kafka achieve high throughput

2.1 Message optimization on the Producer end

Kafka supports sending messages in asynchronous batches. When a Producer produces a message, it does not immediately send it to the Broker. Instead, it puts it into the message buffer and sends it to the Broker in batches when the buffer is full or the number of messages reaches a limit. The Producer side needs to pay attention to the following parameters:

  • The **acks argument: ** indicates whether Producer sends a message and needs to wait for the broker to respond. Currently, three values are provided. Acks =0 indicates that the message is returned immediately after being sent without waiting for the broker’s confirmation. Acks =1 indicates that after a message has been written to the primary partition, the broker needs to respond. There is no guarantee that the replication partition has been written to Kafka. If the primary partition fails, the message may be lost. Acks =-1 is the strictest confirmation and must wait until the message is written to the primary partition and the synchronous replication partition succeeds.
  • ** Buffer. memory Parameter: ** indicates the size of the message cache, in bytes.
  • **batch.size Parameter: ** Batch threshold. When Kafka sends messages asynchronously, the messages are sent in Batch mode by default. Messages on the same topic and in the same partition are merged into one batch by default and sent when the threshold is reached.
  • ** Linger. Ms parameter: ** indicates the maximum delay time for a message. The default is 0, indicating that the message is sent without pause.

You can use the above parameters to balance reliability, latency, and throughput. If a service tolerates message loss and does not have high latency requirements, for example, for click rate statistics, you can set batch.size to a large value, linger. Ms to a long length, and acks=0 to achieve a high throughput rate.

2.2 Sequential Disk Read and Write operations

Kafka is essentially a message queue based on disk storage. Although we have the impression that disk reads and writes are very slow, Kafka takes advantage of the operating system’s I/O optimization strategy to achieve similar effects to memory reads and writes. This optimization strategy is Page Cache. Page Cache can be regarded as a map of disk files in physical memory. According to the principle of locality, if a piece of data is used, then the data stored around it is likely to be used soon. The operating system can optimize disk read and write by prefetch and write.

  • ** Prefetch: ** Sequential disk reads are very efficient (no seek time required, very little rotation time required). When reading a piece of data from the disk, the data from adjacent addresses will be loaded into PageCache in sequence. In this way, when reading subsequent continuous data, only data needs to be read from PageCache, which is equivalent to memory reading and writing, and the speed will be very fast.
  • ** Write after: ** Data is not written directly to disk, but is written to Page Cache first and then flushed to disk by default. The refresh rate is triggered by the operating system’s sync periodically (you can also manually invoke sync to trigger the refresh operation). The write after mode greatly reduces the total number of disk writes and improves the write efficiency.

Messages in Kafka are stored in partitions, each of which corresponds to a set of disk files with contiguous physical space. When a new message comes in, it is appended to the end of the disk file. When consumers pull messages, they also pull data consumption sequentially in the unit of partition. Kafka reads and writes sequentially. PageCache can be used efficiently to solve disk read and write performance problems.

2.3 Zero-copy technology

Zero-copy technology The zero-copy technology further optimizes I/O performance by reducing the number of data copies in memory. From the perspective of the Broker, Kafka’s consumption corresponds to the process of reading data from a disk file and sending it to the network card. Before introducing zero copy, let’s look at how this process is implemented with traditional IO.

  • The application process switches from user to kernel mode using the system call read()
  • The operating system entrusts DMA (Direct Memory Access) to copy disk data to the kernel buffer.
  • Read () returns, the kernel cache data is copied to the user buffer (CPU copy), and the process is switched from kernel to user mode
  • After obtaining data, the application process invokes write() to switch from user mode to kernel mode again
  • Copy data from user buffer to socket buffer (CPU copy)
  • The operating system entrusts a DMA to write socket buffer data to the nic (DMA copy)
  • The write() method returns, the process switches from kernel to user mode, and the process ends

The traditional process relies on read/write system calls and requires four context switches, two CPU copies, and two DMA copies, both of which consume a lot of system resources (DMA copy requires no CPU intervention and is relatively resource-free). Can you optimize the process?

Kafka uses sendfile’s system call instead of the traditional Read /write system call. Sendfile is a very typical zero-copy technology. Let’s see how SendFile reads data from disk files and sends it to the network card:

  • The application process calls sendFile to enter the kernel state from the user state
  • The operating system entrusts a DMA to copy disk data to the kernel buffer.
  • The operating system sends file descriptors and data lengths to the socket buffer, and delegates DMA to copy the data, writing it directly to the nic from the kernel buffer
  • The sendfile method returns, and the process switches from kernel mode to user mode. The process ends

It can be seen that after using the SendFile process, there are only two context switches and zero CPU copy, so the performance improvement is not too obvious.

2.4 End-to-end Compression Mode

Under bandwidth constraints, throughput is inversely proportional to message size, so compressing messages can significantly improve throughput. Kafka supports several compression algorithms, such as GZIP, Snappy, and LZ4 and Zstd. Kafka also supports end-to-end compression. Messages are compressed at the production End, and stored on disk until decompressed at the consumer End. This ensures that messages are always compressed during network transmission.

3. Sequence in Kafka

The sequence of message queues is mainly reflected in that the production end produces messages in a certain order and the consumer end receives and processes messages in the same order. In a stand-alone system, strict sequentiality can be guaranteed, but in a distributed system, strict sequentiality is difficult to guarantee, and often requires a trade-off between performance and sequentiality. Let’s start by considering which elements affect the orderliness of Kafka:

  • Producer side: We mentioned in Section 2.1 that Kafka’s Producer side sends messages to the broker in asynchronous batches by default. In asynchronous batches, messages that fail to be sent and then try again will be out of order. To ensure strict order, the production side would have to send messages synchronously, which would obviously affect performance
  • Broker: Each message corresponds to a topic, and each topic corresponds to one or more partitions. Messages are ordered in each partition, but the order cannot be guaranteed globally. To ensure strict order, only a single partition can be assigned to each message, but this sacrifices the horizontal extensibility of partitions.
  • Consumer: The Kafka Consumer can take data from the Broker in batches and process it in a multi-threaded manner, but multi-threaded consumption can also lead to out-of-order. For strict order, only a single thread can be used.

In business, we don’t usually make performance trade-offs in Kafka to achieve strict order. In most cases, we don’t need to guarantee global order, just local order is sufficient. For example, for the consumption of commodity orders, we don’t care whether the messages of commodity A and commodity B are out of order, but we need to ensure that all the messages of commodity A must be in order. Kafka can be partitioned with customized keys, and each partition in Kafka is ordered, which makes it easy to implement local ordering.

4. Kafka message push mode

The Kafka message production process adopts push mode, and the message consumption process adopts poll mode.

  • ** Push mode: ** messages are sent directly downstream. The advantage is that messages can be delivered downstream in real time with no latency. The disadvantage is that the message rate is determined by the upstream. If the message rate is too high, the pressure on the downstream service will be great, which also limits the expansion of the upstream service.
  • Poll mode: The downstream service actively pulls data from the upstream through polling. The advantage is that the downstream service can flexibly adjust the receiving rate according to its own performance and has strong expansion capability. The downside is that the instantaneity of the message depends on the polling interval, and there may be some performance loss (polling even if no message is required).

5. Kafka message semantics

During message transmission, it is inevitable to encounter various exceptions. Should the message be lost or retried after an exception occurs? Will there be repeated consumption after retry? Message semantics is concerned with whether messages can guarantee one-to-one consumption in the case of various exceptions. There are usually three modes:

  • In this mode, messages are allowed to be lost, but no repeated processing is allowed. Typically, messages are lost when exceptions occur. If acks=0 is set, it is equivalent to implementing the Kafka production at most once.
  • **At least once: a message is produced and processed At least once. In this mode, repeated processing is allowed, but message loss is not allowed. Typically, retry if an exception occurs. If acks=1 or -1 is set on the Kafka production end, message transmission exceptions are retried.
  • **Exactly once: ** In this mode messages are processed one to one, with no message loss and no repeat processing. Strict exactly once is difficult to achieve, and the business usually adopts the way of idempotent +at least once to achieve the same effect as exactly once. For example, after Kafka 0.11, message retries at the production end are de-duplicated (idempotent), approximately guaranteeing exactly once semantics.

Kafka’s consumer side can also be configured to implement different modes. For example, if the consumer is configured to automatically submit the offset, it will submit the offset even if the message processing fails, ensuring the semantics of at most once. The semantic implementation of At least once is complicated. On the one hand, manual submission of offset needs to be configured, and on the other hand, batch submission needs to be considered.

6. Kafka guarantees high availability

High availability in distributed systems is almost always achieved through redundancy, and Kafka is no different. Kafka messages are stored in partitions, and each partition has multiple copies in other brokers. External read and write services are provided through the primary partition. When the broker where the primary partition resides fails, the HA mechanism is used to re-elect a replica partition on another broker as the primary partition to continue providing services.

7. To summarize

Kafka is an excellent messaging middleware with high throughput. Kafka provides a number of parameters internally for varying degrees of sequentiality, real-time, and message semantics. In engineering, we make tradeoffs between performance and availability based on the characteristics of our business.