preface

Kafka-1: Installation and Key Concepts kafka-1: Installation and Key Concepts Kafka-2: Spring for kafka-1, kafka-3: Spring for kafka-2

Why is Kafka so fast

  1. Zero copy
  2. Page caching
  3. In order to read and write

Kafka’s messages are stored as segments on disk. One of kafka’s features is high throughput and high performance.

Write data

Kafka writes messages to disk so that it doesn’t lose messages. To optimize write speed Kafka does two main optimizations, sequential writes and MMAP: Memory Mapped Files.

  1. Sequential writes: The main reason for disk slowness is the need to address first and then write again. Addressing is a mechanical action that is the most time consuming, and sequential writes can be comparable to memory in some scenarios. soKafkaSequential write is used, each time the data is written insegmentAdd the data later, and one of the disadvantages is that there’s no way to delete the data, sokafkaIt doesn’t delete data, it just puts onesegmentTo delete.
  2. Even with sequential writes to disk, it’s still not keeping up with memory, sokafkaThe data is not written to disk in real time, it is fully utilizedMMAPTo improve theIOThe efficiency. Memory mapping for shortMMAP: Memory Mapped Files, its operating principle is the use of the operating systempage cachePaged caching enables direct mapping of files to physical memory, so that your operations on physical memory are synchronized to hard disk at the appropriate time for the operating system. In this way, the overhead of user space to kernel space replication is savedPs: The read of the calling file puts the data into memory in kernel space and then copies it to memory in user space. But there is a big problem with unreliability, he wrotemmapData is not actually written to disk, only in OS callsflush“Is actually written to disk. You can set it in the configuration filelog.flush.interval.messages=10000,log.flush.interval.ms=1000How long or how many messages are flushed to disk.

Read the data

The speed at which data is read using zero copy.

File transfer in traditional mode requires:

  1. callreadFunction to file datacopyTo kernel buffer
  2. readFunction to remove file data from the kernel buffercopyTo user buffer
  3. writeFunction call to remove file data from the user buffercopyTo the kernel andsocketAssociated buffer
  4. Finally, the data fromsocketThe buffercopyTo the relevant protocol engine

The traditional read/write file transfer over the network requires four copies of the file, disk -> kernel buffer -> user buffer -> socket buffer -> protocol engine. Using zero-copy can reduce the number of copies from the kernel buffer to the user buffer. Switching between the kernel and user modes is very time-consuming.

Batch compression

In many cases, the bottleneck is not CPU or disk but network IO. Kafka uses batch compression to reduce network IO.

Summary:

Kafka’s speed is mainly attributed to: it converts all messages into a batch file, performs reasonable batch compression, reduces network I/O consumption, improves disk I/O speed through MMAP, and uses sequential write methods. Read data with zero copy to reduce the user buffer to the kernel buffer file copy.

How do I keep Kafka from losing messages

Before we discuss how to ensure that Kafka does not lose messages, let’s take a look at the producer ACKS mechanism and consumer offset submission.

Producer ACKS mechanism

Refer to the official Kafka documentation for configuration items

The ACKS parameter specifies the number of replicas that need to receive the message before the producer considers the message to be successfully sent. The reliability of the message is as follows:

  1. acks=0The producer does not wait for any acknowledgement from the server as long as the message is sent tosocketThe message is considered successful in the buffer. This way there is no guarantee that the server will actually receive the message, and the retry mechanism will not work because the client will not receive any failure messages. The offset returned by each record is set to -1.
  2. acks=1:leaderThe replica receives the message and writes it to the local log file, but does not wait for allfollowerThe copy synchronization is complete. In this way, ifleaderAfter receiving the message, thefollowerHanging before synchronization results in message loss.
  3. acks=all:leaderCopies will wait for allISRConfirm when the copy synchronization is complete. As long as there is oneISRThe message is not lost if the copy is still alive. It is equivalent toacks = -1

How the consumer offset is submitted

The consumer commits the last read message offset for each partition to Zookeeper or Kafka topics. If the consumer shuts down or restarts, its read state is not lost. There are two main ways to commit the offset:

  1. Automatic submission:

If enable.auto.mit is set to true, then the consumer will automatically submit the offset currently processed to Zookeeper. The automatic submission interval is 5s. However, the risk of repeated consumption of messages in auTC can be reduced by reducing the interval of automatic submission, but only reduced.

  1. Manual submission:

Due to the inadequacy of Kafka’s automatic commit offsets (commit only at a specified frequency, with the possibility of repeated consumption of messages), Kafka provides a policy of manually committing offsets. Set the enable.auto.mit auto commit parameter to false to disable auto commit. The manual commit offset is shown in the following example:

@KafkaListener(id = "cat", topics = "myTopic", containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {... ack.acknowledge(); }Copy the code

How do I ensure that messages are not lost

Now that we know about the producer ACKS mechanism and the submission of consumer offsets, let’s talk about how to ensure that messages are not lost.

1. Set the number of replication partitions >= 3

Set system parameter replication.factor >= 3

Or specify Replication >= 3 when creating the Topic

2. Set min.insync.replicas > 1

Min.insync.replicas refers to the minimum number of replicatitions in an ISR. Kafka-1 provides a detailed description of the ISR. This parameter is valid only when ACKS = all is set. When ACKS = all is greater than 1, the message is acknowledged only when at least two copies are synchronized.

3. Set the unclean. Leader. Election. Enable = false

This parameter is used to determine whether the leader copy can be selected from the OSR. This side should be set to false to prevent the replicas with more data missing from becoming the leader and lower the possibility of message loss.

A detailed description of OSR can also be found in the section kafKA-1: Installation and Introduction to Key Concepts

4. The producer adds asynchronous callback to check the cause of message sending failure

ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, o);
future.addCallback(result -> logger.info("The producer successfully sent a message to topic:{} partition:{}", result.getRecordMetadata().topic(), result.getRecordMetadata().partition()),
                   ex -> logger.error("Producer send failed, cause: {}", ex.getMessage()));
Copy the code

5. The producer sets acks = all

spring:
  kafka:
    producer:
    	acks: all
Copy the code

6. The producer adds the number of message retries

spring:
  kafka:
    producer:
    	retries: 3
Copy the code

7. Set the submission mode of the consumption offset to manual submission

spring:
  kafka:
  	consumer:
    	enable-auto-commit: false
Copy the code

Note: In order to prevent message loss, many of the above Settings affect performance. Therefore, reliability and efficiency are often incompatible. Parameter Settings should be based on actual application scenarios.

Kafka rebalance

Rebalancing is essentially a protocol that specifies how consumers in a consumer group assign partitions within a topic.

Conditions triggered by rebalancing

  1. A change in the membership of a consumer group involves adding and subtracting consumers. Note that the reduction here is most likely passive, where some consumer crashes out
  2. The number of partitions for the topic has changed,kafkaCurrently, only partitions can be added, and when added, rebalancing is triggered
  3. The subscribed topic changes, and when a consumer group subscribes to a topic using a regular expression and happens to create a new topic, rebalancing is triggered

Why avoid rebalancing

The inability of consumers to consume messages from Kafka during rebalancing can have a significant impact on Kafka’s TPS, and if a Kafka cluster has a large number of nodes, such as hundreds, rebalancing can be time-consuming. This can take anywhere from minutes to hours, during which Kafka is essentially unavailable. So in an online environment, you should try to avoid rebalancing.

How to avoid rebalancing

The previous section described the conditions for triggering the rebalance, so avoiding rebalancing is naturally a condition for avoiding the trigger.

Changes in the number of topic partitions, changes in the subscribed topics and the active increase of consumers are all actively controlled, and the situation of rebalancing can be considered in the corresponding operation. In addition to trying to avoid situations where rebalancing is actively triggered, we should also avoid situations where rebalancing occurs without our knowledge.

Regardless of the fact that the consumer actually dies, because there’s nothing we can do about that, the optimization we need to make is to prevent Kafka from thinking that a normal consumer has died. The following three configurations are used to adjust:

  1. session.timout.ms: Timeout period for heartbeat detection. You are advised to set it to 6s.
  2. heartbeat.interval.ms: Indicates the interval for heartbeat detection. A smaller interval prevents miscalculation and consumes more resources. The recommended value is 2s.
  3. max.poll.interval.ms: Consumers twicepollMaximum data interval: consumers need to process data after pulling data, and then pull data again. If the interval between two pulling data exceeds the time set by this parameter, the consumer group will remove the consumer from the consumer group. The default value is 5 minutes. The value is determined by the actual maximum message processing time, which is generally one minute longer than the maximum message processing time.

Several strategies for rebalancing

  1. Range: Based on a singleTopicforPartitionThe distribution of
  2. RoundRobin: Based on multipleTopicPartitionThe distribution of
  3. Sticky: In the process of rebalancing, try not to change the distribution to the original consumerspartitionOn the basis of, to rebalance.

The difference between 1 and 2 is that if there are 5 topics, each topic has 3 partitions and is subscribed by a consumer group of 15 consumers;

If you use the first strategy, it looks at the first topic with five partitions, one for each of the first five consumers, and the same is true for topics 2 through 5, which results in three partitions for each of the first five consumers and ten free consumers behind them.

If you use the second strategy, let’s look at 5 topics with 15 partitions, 15 consumers, and you just assign one partition to each consumer.