1, an overview of the

Kafka is ubiquitous messaging middleware in the big data space, and is now widely used for real-time data pipelines within enterprises and to help enterprises build their own streaming computing applications. Although Kafka is based on disk data storage, but it has high performance, high throughput, low latency characteristics, its throughput is often tens of thousands, tens of millions, which is worth us to explore the reason, let us explore Kafka various exquisite design.

2, Kafka performance analysis

1. Kafka system architecture

In Kafka, Producer produces messages as partitions and submits them to the Leader node of each Partition in a cluster according to a routing policy. Consumer produces messages as partitions and sends them to the Leader node of each Partition in a cluster according to a routing policy. Pull and consume messages from the Leader node in the Broker. Producing messages involves a large number of message network transfers. If a Producer produces a message and sends it to the Broker every time it produces a message, a large amount of network consumption will occur, which seriously affects Kafka’s performance. To solve this problem, Kafka uses bulk delivery. When brokers persist and read messages, Kafka uses traditional I/O reading and writing methods, which seriously affects the performance of Kafka. To solve this problem, Kafka uses sequential write + zero copy. The following describes how Kafka improves performance from the perspectives of batch sending, persistent messages, and zero copy.

2. Send messages in batches

Producer generates messages and sends them to the Broker, which involves a large number of network transfers. If only one message is sent during a network transfer, serious network consumption occurs. To solve this problem, Kafka uses bulk delivery. The following describes how a Producer produces messages and sends them to the Broker.

2.1 Parition

Kafka messages are key-value pairs, which can be set to null by default. The key serves two purposes, either as additional information for a message or to determine which Partition the message is written to. The data for a Topic is divided into one or more partitions, which are collections of messages, and partitions are the smallest granularity consumed by consumers.Kafka divides topics into multiple partitions. Producer distributes messages to message queues of multiple local partitions. Messages in each Partition message queue are written to different Leader nodes. As shown in the figure above, messages are distributed to local queues corresponding to different partitions through routing policies, and then sent to the Leader nodes corresponding to the partitions in batches.

2.2. Message routing

A Topic in Kafka has multiple partitions, so the policy for assigning messages to one Partition is called a routing policy. Kafka has three routing policies:

  • Round Robin: The most commonly used partitioning policy is that Producer evenly distributes messages to local queues of each Partition.

  • Hash: Kafka hashes the key of a message and routes the message to a particular Parttion based on the hash value. Messages with the same key are always routed to the same Partition.
  • Custom partitioning policies: Kafka supports custom partitioning policies that map a series of messages to the same Partition.

2.3 Sending Process

The figure above shows the main process of producing messages from Producer to Broker. Producer first produces, serializes and compreses the messages, and then adds them to the local RecordAccumulator. Sender keeps polling the RecordAccumulator, and sends the data in the queue to the Partition Leader node when certain conditions are met. The Sender sends data to the Broker under two conditions:

  • The message size reached the threshold. Procedure
  • The waiting time for sending a message reached the threshold. Procedure

Producer creates a two-end queue for each Partition to cache client messages. Each element of the queue is a ProducerBatch record. The batch record uses createdMs to represent the creation time of the batch record (the time when the first message in the batch record was added). TopicPartion Specifies the Partition metadata. When serialized messages produced by Producer are written to the recordsBuilder object first. Once the size of batch records in the queue reaches a threshold, it is sent by the Sender to the Leader node corresponding to the Partition. If the waiting time for sending batch records reaches the threshold, the messages are also sent to the Leader node corresponding to the Partition.

When appending messages, you need to obtain the queue to which the Partition belongs and the last batch record in the queue. If no batch record exists in the queue or the number of batch records reaches the threshold, you need to create a new batch record and add it to the end of the queue. The batch record created first is filled with messages first, and the batch record created later represents the latest message. Messages are always appended to the batch record at the end of the queue. The record collector is used to cache messages from the client and also needs the Sender to send messages to the Leader node corresponding to the Partition. The Sender reads the record collector, gets a list of batch records for each Leader node, finds the prepared Broker nodes and establishes connections, and then sends the batch records for each Partition to the Leader node. The core code for Sender is as follows:

//Sender reads record collector, groups by node, creates client request, sends request
public void run(long now) {
  Cluster cluster = metadata.fetch();
  // Get all partitions ready to send
  ReadCheckResult result = accumulator.ready(cluster, now);
  // Establish a network connection to the Leader node and remove the node that is not ready
  Iterator<Node> iter = result.readyNodes.iterator();
  while(iter.hasNext()) {
    Node node = iter.next();
    if (!this.client.read(node, now)) {
      iter.remove();
    }
    // Reads the record collector, which returns a list of batch records for each Leader node, each of which corresponds to a partition
    Map<Integer, List<RecordBatch>> batches = accumulator.drain(cluster, result.readyNodes, 
                                                                this.maxRequestSize, now);
    // A list of production requests at the node level, i.e. only one client request per node
    List<ClientRequest> requests = createProduceRequests(batches, now);
    for (ClientRequest request : requests) {
      client.send(request, now);
    }
    // This is where real network reads and writes are performed, such as sending the client request above
    this.client.poll(pollTimeout, now); }}Copy the code

The specific steps are as follows:

  1. The messages are collected by the recording collector and appended by Partition to a batch record at the end of the queue.Copy the code
  2. The Sender finds the ready server node from the record collector by ready(), with the rule that the Partition has reached a threshold for the size of the message waiting to be sent and the time waiting to be sent.Copy the code
  3. The nodes are ready to connect to the server through connect() if the client has not already connected to them.Copy the code
  4. The Sender retrieves batch records for each Partition sorted by node from the recording collector via drain().Copy the code
  5. After the Sender has the batch record for each node, it creates a client request for each node and sends the message to the server.Copy the code

3. Message persistence

3.1 Random AND Sequential I/OS

The diagram above shows a simple mock-up of a disk. Data on a disk is identified by a cylinder number, disk number, and sector number. When data needs to be read from the disk, the system sends the logical address of the data to the disk. The control circuit of the disk translates the logical address into the physical address, that is, determines the track and sector of the data to be read. In order to read data from this sector, the head needs to be placed above the sector. To do this:

  • The cylinder must first be found, that is, the magnetic head must be moved to align with the corresponding track, a process called track seeking or positioning.
  • Once the disk surface is identified, the disk begins to rotate, rotating the target sector under the head

Therefore, a read data request completion process consists of three actions:

  • Track seeking: the magnetic head moves and locates to the specified track, which costs the most time, up to about 0.1s;
  • Rotation delay: Waits for the specified sector to rotate under the head. Related to the performance of the hard disk, XXXX to/min;
  • Data transfer: The time when data is transferred from disk to memory across the system bus.

For operations that read data from disk, called IO operations, there are two cases:

  • Suppose we need data are randomly scattered in different sectors of the different disk platters, then find the corresponding data need to wait for spin magnetic arm by addressing the role to the specified disc, and then find the corresponding plate sector, to find a piece of data, we need time for this process until you find all the data, known as the random I/o, The data reading speed is slow.
  • Assuming that we have found the first piece of data and that the rest of the data we need is just behind it, we don’t need to readdress and can get the data we need in sequence, called sequential IO.

Compared with random I/O, sequential I/O reduces the disk addressing process and improves the data query efficiency.

3.2 Broker writes messages

A large number of messages need to be persisted in the Broker, and there are a large number of message query scenarios. If traditional I/O operations are used, a large number of disk addressing will occur, affecting the speed of message query and limiting the performance of Kafka. To solve this problem, Kafka uses a sequential write approach to message persistence. Each message in the message set sent by the Producer to the Broker is assigned an order value that marks the order in which the messages are produced by the Producer. Each sequence starts at 0. As an example shown in the following figure, there are three messages written by Producer to Partition, and the corresponding sequence values are [0,1,2].

The sequence value of each message in the message set created by Producer is only the sequence number relative to the batch number, so the value cannot be directly stored in the log file. The server converts the order value of each message to an absolute offset (the Broker marks the order of messages from the Partition dimension, which controls the order in which consumers consume messages). Kafka records the offset of the last message stored in the log by nextOffset(the nextOffset).

Message Absolute offset The order value
Message0 900 0
Message1 901 1
Message2 902 2

As shown in the above table, before the message is written, nextOffset is 899, Message0, Message1, and Message2 are three messages written consecutively. After the message is written, its absolute offsets are 900, 901, and 902 respectively, and the corresponding sequence values are 0, 1, and 2 respectively. NextOffset becomes 902. The Broker appends messages to the log from each Partition in segments. When the size of a Segment reaches a threshold (default: 1 GB), a new Segment is created to store the new message. Each Segment has a baseOffset (the absolute offset of the first message stored in each Segment). You can calculate the absolute offset of each message in the Partition. Each log segment consists of data files (file names ending in log) that hold the specific contents of the message set, and index files (file names ending in index) that hold the index of the message offset to the physical location. As shown below:

The nextOffsetMetaData specifies the start offset of the message currently written to the log. After the message is appended, nextOffsetMetaData is updated as the start offset of the next batch of messages. The core code is as follows:

@volatile var nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset(), 
                                                        activeSegment.baseOffset, activeSegment.size.toInt);
def append(messages:ByteBufferMessageSet, assignOffsets:Boolean) = {
    //LogAppendInfo object, which represents the summary of the batch of messages and then validates the messages
    var appendInfo = analyzeAndValidateMessageSet(messages)
    var validMessages = trimInvalidBytes(messages, appendInfo)
    // Get the latest "next offset" as the absolute offset of the first message
    appendInfo.firstOffset = nextOffsetMetadata.messageOffset
    if (assignOffsets) { // If the offset of each message is increasing
      // The starting offset of the message comes from the latest "next offset", not the sequential value of the message
      var offset = new AtomicLong(nextOffsetMetadata.messageOffset);
      Reassign absolute offsets for each message in a valid message set, based on the starting offsets
      validMessages = validMessages.validateMessagesAndAssignOffsets(offset);
      appendInfo.lastOffset = offset.get - 1 // The absolute offset of the last message
    }
    var segment = maybeRoll(validMessages.sizeInBytes) // If the Segment size threshold is reached, create a new Segment
    segment.append(appendInfo.firstOffset,validMessages) // Appends messages to the current segment
    updateLogEndOffset(appendInfo.lastOffset + 1) // Modify the latest "next offset"
    if (unflushedMessages >= config.flushInterval) {
      flush() // If the number of unrefreshed messages is greater than the configured number, the messages are flushed to disk}}// Update the "last offset" of the log. The parameter passed is usually the offset of the last message plus 1
// Do not need to do the increment operation when sending the "nearest quantity" that needs to fetch the log
private def updateLogEndOffset(messageOffset:Long) {
  nextOffsetMetadata = new LogOffsetMetadata(messageOffset, activeSegment.baseOffset,activeSegment.size.toInt)
}
Copy the code

The read and write operations of nextOffsetMetaData occur in the process of persisting and reading messages, as follows: 1. Producer sends the message set to the Broker, which appends the messages to the log. The absolute offset must be specified for each message. The Broker uses the value of nextOffsetMetaData as the starting offset. 3. The Broker writes each message with an offset to a log fragment. 4. The Broker takes the offset of the last message in the batch, increses it by 1, and updates nextOffsetMetaData. 5. The Consumer pulls the message based on the latest value of this variable. Once this value changes, the Consumer can pull the newly written message. Since the set of messages written to the log fragment are all absolute offsets starting with nextOffsetMetaData. Because this starting offset is always increasing, the offsets for each batch of messages are also increasing, and for all log segments of each Partition, the offsets for all messages are increasing. As shown in the following figure, the baseline offset of the newly created log segment is larger than that of the previous segment, and the offset of the new message in the same log segment is also larger than that of the previous message.The purpose of the index file is to quickly locate the physical location of the specified offset message in the data file. The index file stores the mapping of the relative offsets of a part of the message to the physical location. The use of relative offsets rather than absolute offsets is to save memory.

3.3 Query Based on Index Files

Kafka indexes files to improve the efficiency of querying messages on disk.As shown in the figure above: Suppose there are 1000 messages, and every 100 messages fill one log fragment, there will be 10 log fragments. If there is no index file, we must read from the first message in the data file of the first log fragment until we find the message with the offset 999. After had the index file, we can in the index file of the last log section, the first to use absolute deviation is 999 minus the benchmark relative offset offset of 900 99, and then find the closest relative offset 99 index data of the 90, 90 corresponds to the relative offset physical address is 1365, and then to the data file, Read the message backwards from the file’s physical location 1365 until it finds a message with an offset of 999. Kafka index file features:

  • The index file maps offsets to the physical location of the file, and it does not index every message, so it is sparse.
  • The offset of an index entry stores the relative offset with respect to the base offset, not the absolute offset of the message.
  • Offsets are ordered, and binary lookup is used to quickly determine the location of the offset when querying for a specified offset.
  • If the specified offset does not exist in the index file, you can find the maximum offset less than or equal to the specified offset.
  • Sparse indexes can put the entire index file into memory through memory mapping to speed up the query of offsets.

Since the Broker persists messages to the last segment of the current log, writes to files are appending, using sequential writes to disk files. Sequential writes to disk and indexed files speed up Broker queries for messages.

4. Zero copy

In Kafka, there are a lot of network data persisting from Producer to Broker and disk files being sent from Broker to Consumer over the network. The performance of this process directly affects the overall performance of Kafka. Kafka solves this problem with a generic technique called zero copy. Zero-copy technology can reduce the number of data copy and shared bus operations, eliminate the number of unnecessary intermediate copies of transmitted data between the memory, reduce the overhead caused by context switch between the user application address space and the operating system kernel address space, so as to effectively improve the data transmission efficiency. Take sending disk files over the network as an example. The following shows a copy of the data that is read in the traditional way and sent over the network:

  • After a read operation, the DMA performs a copy of the data from disk to kernel space;
  • The CPU copies data from kernel space to user space
  • Send () is called, and the CPU copies the data from user space to the kernel space (socket buffer).
  • After send(), DMA makes a fourth copy of the data from the kernel to the protocol engine Linux 2.4+ the kernel provides zero copy via the sendFile system call. After the data is DMA copied to the kernel Buffer, it is directly DMA copied to the NIC Buffer without CPU copying, which is where the term zero copy comes from. In addition to reducing data copies, because the entire read-file-network send is done by a single sendFile call, there are only two context switches and no CPU data copies, thus greatly improving performance. The zero-copy process is shown in the following figure.
    • Sendfile () copies the file contents to a read buffer via DMA, and the kernel copies the data to the kernel buffer associated with the output socket.

In terms of implementation, Kafka transfers data through TransportLayer. Its subclass PlaintextTransportLayer implements zero copy through the FileChannel transferTo() and transferFrom() methods of Java NIO. TransferTo () and transferFrom() do not guarantee that zero-copy can be used. In fact, whether zero-copy can be used depends on the operating system. If the operating system provides zero-copy system call such as SendFile, the two methods will make full use of the advantages of zero-copy through such system call. Otherwise, zero copy cannot be achieved through the two methods themselves.

reference

Kafka’s high performance disk read and write implementation of the original Kafka log storage