Benefits of message queues

(1) Decoupling Producers and consumers are completely decoupled. Producers are only responsible for producing messages, and consumers consume messages when appropriate, without any influence on each other.

(2) After the asynchronous producer has produced the message, there is no need for blocking or polling.

(3) Peak elimination When the downstream system processing speed has a peak, the message queue can be adopted, so that the upstream can support the request volume to produce messages, but the downstream can control the message consumption speed, so that the peak flow will be delayed.

kafka

Kafka functions in various parts

Kafka: What is Kafka?

  • Producer: Responsible for producing messages
  • Consumer: Responsible for consuming messages
  • Broker: A Kafka node
  • Topic: a topic. Consumers subscribe to a topic, and producers produce data for a topic
  • Partitioning: Each topic can have different partitions, and partitions of the same topic may reside on different brokers. Persisting data to disk to save data.
  • Messages from the same partition are sequential, but messages from different partitions of the same topic are out of order
  • Backup: Kafka data is stored in partitions, and data for each partition is backed up to a different broker, so if partition 1 is on Broker1, the backup data for partition 1 is probably on Broker2 or broker3.
  • Consumer group: a group of consumers. Subscription topics are based on consumer groups

How do kafaka producers select partitions when producing and consumers select partitions when consuming?

How do producers deliver messages to which partition in Kafka? How does consumer choose partition again?

When a producer produces a message, it will assign the message to a certain partition. Different producers usually allocate the message to different partitions. If they do not specify the message, the message will be placed in the partition in turn

When consuming messages, consumers will consume messages in the specified partition. A consumer can consume multiple partitions, but the same partition cannot be consumed by multiple consumers. Each consumer maintains its own message pointer in the partition.

A partition can belong to only one topic, but a topic can be subscribed by multiple consumer groups. Each consumer group consumes partition messages independently of each other. Messages are not deleted by consumption, but simply read

How is kafka cluster service registered and discovered

Using ZooKeeper to complete. Find out which brokers are alive by creating temporary nodes and listening.

The Kafaka cluster is managed by ZooKeeper

How do Kafka consumers consume messages

How to consume messages

When consuming messages, consumers need to implement a while loop by themselves and poll messages of the corresponding partition continuously

How many messages can be retrieved at one time

Consumers obtain messages by pulling, and the client continuously polls the corresponding partition on the broker. If there are messages, the client will pull multiple messages at a time (the number of pulls depends on the configuration of the number of messages and the size of the information). Each polling will block a period of events and wait for messages to be generated in the message queue, avoiding too frequent polling.

fetch.min.bytes

This parameter allows the consumer to specify a minimum amount of data to read messages from the broker. When a consumer reads a message from the broker, if the amount of data is less than this threshold, the broker waits until there is enough data before returning it to the consumer.

fetch.max.wait.ms

The fetch.min.bytes parameter above specifies the minimum amount of data to be read by the consumer, while this parameter specifies the maximum amount of time the consumer can wait to read, thus avoiding long blocks. This parameter defaults to 500ms.

max.partition.fetch.bytes

This parameter specifies the maximum number of bytes returned per partition. The default is 1M.

Consumer heartbeat

Session.timeout. ms: indicates the number of seconds in which the heartbeat timeout occurs. If there is no heartbeat connection, the heartbeat timeout is considered

Heartbeat.interval. ms: Indicates the heartbeat interval, which is the number of seconds at which a heartbeat is sent

Kafka controls timeouts through a heartbeat mechanism that is insensitive to the consumer client. It is an asynchronous thread that starts working when we start a consumer instance.

Each consumer has a ConsumerCoordinator, and each ConsumerCoordinator starts a HeartbeatThread thread to maintain the heartbeat.

Heartbeat threads are related to the parameter heartbeat.interval.ms mentioned above, heartbeat threads. Every heartbeat. Interval. Ms sends a heartbeat packet to the coordinator on the broker to prove that it is still alive.

As long as the heartbeat thread has sent a heartbeat packet to a coordinator in session.timeout.ms, the group coordinator considers the kafka consumer to be alive.

coordinator

Coordinator causes

Before kafka0.9,consumer rebalance was done by registering watch on zookeeper. When a consumer is created, its ID is registered with the consumer group in the path /consumers/[Consumer Group]/ids/[Consumer ID] on Zookeeper. Then register watch under /consumers/[Consumer Group]/ IDS and /brokers/ IDS; Finally force yourself to start the rebalance in the consumer group.

This can easily lead to a herding effect on ZK. Any change in or loss of brokers or consumers triggers the Rebalance of all consumers, causing lots of changes in the cluster. Because zooKeeper allows each consumer to determine whether the Broker and consumer are down, different consumers may see different behavior at the same time. This can cause many incorrect rebalance attempts. In addition, because consumers are independent of each other, no one knows whether the other consumers have made the rebalance successfully, which may lead to incorrect consumption by the consumer group.

The inevitable herding and fragmentation of ZK-based rebalance can be solved by not using ZK to coordinate failure detection and rebalance logic in a highly available center. The release of kafka0.9.* redesigned the consumer side to create a high availability center Coordinator that significantly reduced zookeeper load.

The coordinator role

Coordinators can be classified into two types: Group coordinators and consume coordinators.

A group Coordinator resides on a broer. Each broker has one consume group. Each Consume Group is responsible for its offset shift management and Consumer Rebalance.

The consume Coordinator resides on each consumer order and its only role is to submit displacement information to the Group Coordinator.

How does a consumer group select a unique group Coordinator

Each consumer group uniquely corresponds to a group Coordinator to ensure the Coordinator’s single point. What is the basis for selecting a Coordinator?

  1. Determine to which partition of the topic __consumerS_offsets the consumer Group shift information is written.

Specific calculation formula:

 __consumers_offsets partition# = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)  
Copy the code

Note: groupMetadataTopicPartitionCount by offsets. Topic. Num. Partitions specified, the default is 50 partition.

  1. The broker where the leader resides is the selected coordinator

Displacement management

A Group Coordinator has a built-in topic(_consumer_offsets). Each consumer in all consumer groups managed by the Group Coordinator submits his or her displacement to this topic, which is centrally managed by the Group Coordinator.

The Coordinator component that manages the offset is the Offset Manager.

Responsible for storing, capturing, and maintaining offsets of consumers. Each broker has an instance of offset Manager. There are two specific implementations:

ZookeeperOffsetManager: Calls ZooKeeper to store and receive offsets (old version of shift management).

DefaultOffsetManager: Provides built-in offset management for consumer offsets.

Offset. Storage in config/server.properties.

Weight balance

When a consumer loses a heartbeat, rebalancing is triggered, which redistributes all existing partitions to all consumers who maintain a heartbeat. The allocation process tries to balance the number of partitions consumed by each consumer

For example, a topic has 100 partitions and a consumer group has 20 consumers. Under the control of the coordinator, each consumer in the group is assigned to 5 partitions, and the process of redistribution is rebalancing.

Conditions triggered by rebalancing

  1. Condition 1: A new consumer is added
  2. Condition 2: The old consumer died
  3. Condition 3: A coordinator hangs and the cluster elects a new coordinator (0.10 specific)
  4. Condition 4: New partition for topic
  5. Condition 5: The consumer calls unsubscrible() and unsubscribes the topic

Disadvantages of rebalancing

Because consumers cannot consume messages from Kafka during rebalancing, this can have a significant impact on Kafka’s TPS, and if kafka has a large number of nodes, such as hundreds, rebalancing can be time-consuming. This can take anywhere from minutes to hours, during which Kafka is essentially unavailable. So in the real world, you should try to avoid rebalancing.

How to avoid rebalancing

In a nutshell, the points that can cause a crash are:

The consumer does not complete a heartbeat within the timeout period, causing rebalance.

Consumers take too long to process, leading to rebalance.

Consumer heartbeat timeout

We know that the consumer communicates with the coordinator through the heartbeat. If the coordinator can’t receive the heartbeat, the coordinator will assume that the consumer is dead and make a rebalance.

The consumer Settings for Kafka are as follows:

Session.timeout. ms Sets the heartbeat timeout period

Heartbeat.interval. ms Heartbeat interval

The session.timeout.ms and heartbeat.interval.ms parameters need to be adjusted so that the consumer and coordinator can maintain the heartbeat. In general, the timeout should be three times the time between heartbeats. If session.timeout.ms is set to 180 seconds, heartbeat.interval.ms is set to 60 seconds.

Why is the timeout set to be three times the heartbeat interval?

This way, multiple heartbeats can occur within a timeout period, avoiding network problems that cause occasional failures.

The consumer processing time is too long

If the consumer takes longer to process than max.poll.interval.ms, it will also cause the coordinator to believe that the consumer is dead and initiate rebalancing in order to redistribute information to other consumers.

Kafka’s consumer parameters are as follows:

Max.poll.interval. ms Specifies the processing time of each consumption

Max.poll. records Number of messages consumed each time

In this case, it is generally a matter of increasing the consumer processing time (i.e., increasing the value of max.poll.interval.ms) and reducing the number of messages per processing (i.e., decreasing the value of max.poll.records).

So take care of the heartbeat timeout and the consumption timeout.

For the heartbeat timeout problem. Generally, you need to increase the heartbeat timeout period (session.timeout.ms) and adjust the ratio between the timeout period (session.timeout.ms) and the heartbeat interval (heartbeat.interval.ms).

The official documentation of Aliyun suggests that the timeout period (session.timeout.ms) be set to 25s, with the maximum duration not exceeding 30 seconds. Then the heartbeat interval (heartbeat.interval.ms) is no more than 10s.

Handle timeouts for consumption. Generally, the processing time of consumers is increased (max.poll.interval.ms) and the number of messages processed each time is decreased (max.poll.records).

The official documentation of Aliyun suggests that the value of max.poll.records should be much smaller than the consumption capacity of the current consumer group (Records < number of consumption by a single thread per second x number of consumption threads x session.timeout seconds).

What is submission? Why submit?

As consumers pull messages, each consumer maintains a message offset. Each time a message is pulled, the message is pulled from the broker’s partition according to the offset. Therefore, whether a consumer consumes a message is determined by the value of the offset.

When messages are returned from the broker to the consumer, the broker does not track whether they were received by the consumer. Kafka lets the consumer manage the shift of consumption itself and provides an interface for the consumer to update the shift, called a commit.

Kafka requires each client to submit an offset to record the consumption of the current partition message. If the offset is only recorded by the consumer, when the consumer dies, no one knows about the consumption of the partition, and the rebalanced consumer will inevitably repeat the consumption.

So offset is shared by both the client and the broker.

(1) The consumer pulled three messages, and the broker failed to submit the offset

After rebalancing, these three messages will be consumed by new consumers, resulting in repeated consumption

(2) The consumer pulled three messages and submitted the offset before finishing processing. Then the consumer hung up

After rebalancing, these three messages are no longer consumed, so the messages are lost.

Automatically submit

In order not to consume messages twice, Kafka turns on auto-commit by default.

Enable.auto.com MIT If this parameter is set to true, automatic submission is enabled

Auto.mit.interval. ms Indicates the interval for automatic submission. The default value is 5s

The autocommit will automatically commit the current offset after your poll() method has pulled the message and after the poll() method has pulled the message, the previous offset+100 will be submitted regardless of whether the 100 messages were consumed.

Manual submission

In most cases, the system can be idempotent, but cannot accept message loss, so automatic commit is turned off and commitSync() is called to actively commit the shift after all messages have been processed.

This way, if the consumer hangs before the submission, the part of the message will be consumed repeatedly, but the consumption will never be lost.