Energetically intelligent server team xinyi
background
Currently, my team is working on a process choreography engine that has several features that require MQ’s delayed messaging/consumer retry features. After various considerations, we finally decided to adopt the architecture of computing and storage separation and develop a customized MQ based on distributed KV storage. Currently, it has the main features of MQ. This paper describes the core idea of MQ development based on distributed KV.
The term
- Message: the Message
- Topic: Logical classification of messages
- Partition: A Topic contains multiple partitions, and each message is ultimately sent to one of the partitions within the Topic
- Partition Offset: Each time a message is sent to a Partition, the Partition’s Offset+1
- Producer: Sends messages to a partition under a Topic
- -Leonard: Consumer Group. A Topic can be consumed by multiple different Consumer groups, and each Consumer Group can consume the entire number of messages in the Topic.
- Consumer: Consumer Topic. If there is only one Consumer in a Consumer Group, it consumes all messages. If there are multiple consumers, each Consumer consumes only part of the message.
- Consumer Group Offset: Records the current consumption location of all partitions under a Topic. The value cannot exceed the maximum Offset for each Partition.
- Broker: A message Broker. Producer sends messages to the Broker, which then sends messages to a topic partition. The Consumer connects to the Broker and consumes messages from it.
- Broker Cluster: A Cluster of brokers that contain multiple instances of Broke under each Cluster for high availability
- Rebalance: Rebalance. In traditional implementations, the number of consumers in a Consumer Group cannot exceed the number of partitions in a Topic. A Partition can only be allocated to one Consumer at most, and the consumers who exceed the number of partitions cannot consume messages. In this implementation, there is no concept of Rebalance. The number of consumers is not limited by Partition.
- Delay Message: Delay Message. Typically, when a message is posted to a Topic, it is consumed immediately. A delayed message means that it is delayed for a specified time before it can be consumed.
- Retry Message: Retry Message. When a message fails to be consumed, it needs to be retried, that is, the message needs to be consumed again by the consumer according to a retry strategy.
The overall architecture
Description:
KV storage
This article takes KV Storage as an example. All Broker instances in a Broker Cluster share this Redis Cluster, and any other KV Storage that supports scan by key can be used.
From a persistence point of view, the use of the in-memory pattern Redis is not appropriate, but the intent here is to illustrate the core principles of MQ based on distributed KV implementation. In fact, in the company, we use distributed KV developed on the basis of RocksDB, which is compatible with REDIS protocol in network communication.
For the sake of simplicity, this article does not discuss the Redis Cluster expansion/scaling and Slot migration. But enough to master the core principle of developing a message middleware based on distributed KV.
Network communication
Producer and Consumer communicate with Broker. In the author’s project, Grpc is used. Grpc is arguably the most popular solution among open source communication frameworks, and Apacha RocketMQ 5.x also uses Grpc. Grpc is not covered in this article.
The detailed design
Broker cluster metadata
When each Broker starts, it registers its information with Redis for producer/consumer service discovery. For example, using the hash structure to maintain:
Key
[cluster]$cluster_name
Copy the code
Value
filed value
broker1 ip:port
broker2 ip:port
Copy the code
The Topic of metadata
Topic metadata mainly maintains how many partitions there are under a Topic and how these partitions are distributed within the Redis Cluster. When a user creates a Topic, he specifies the number of partitions.
The Redis Cluster has 16,384 slots, and each Redis shard is responsible for some of them. When creating a Topic, for example, specifying 10 partitions, you can map the 10 partitions to different slots according to a certain strategy, which is equivalent to indirectly allocating partitions to different Redis shards.
Once a Topic has been created, the partitions under the Topic are allocated to different brokers. For example, 10 partitions, 10 brokers, then each Broker is responsible for one partition. If there are only five partitions, then five of the brokers need to be allocated.
For example, maintain the mapping using a hash structure
key
[topic_metadata]$topic_name
Copy the code
value
filed value
partition1 broker1
partition2 broker2
Copy the code
The message
Messages are defined using protobuf:
message Message{ google.protobuf.Struct metadata = 1; // Metadata of the message string partition = 2; Int64 offset = 3; // Offset string msgId = 4; // The unique id of the message string topic = 5; // Topic string key = 6; Bytes body = 7; / / the message body Google. Protobuf. Timestamp born_time = 8; / / message time Google. Protobuf. Timestamp expireTime = 9; // Message cutoff time, used to delay messages}Copy the code
When a producer sends a message, in the simplest case, it only needs to specify the topic and body of the message. When you have other special requirements, you can specify the following fields:
- Key: Messages with the same key are written to the same partition using the hash algorithm.
- Partition: Specifies the partition directly, not based on the key.
- ExpireTime: delay message. Messages are not expected to be consumed immediately, but only after a specified time.
Message is sent
From the perspective of Producer:
- Retry: Sending a message to the broker may fail, so retry is required. You need to set the retry times and timeout period, and retry within the timeout period.
- Partition selection: The partition selection should be determined on the Producer end. After the partition is determined, messages are sent to the broker to which the partition belongs.
- Aggregation: To reduce network IO, aggregation batches should be sent, noting that aggregation is done by partition
- Broker selection: For unordered messages, the selection of brokers can have certain policies, such as a high failure rate or a high latency, should be preferred to other brokers.
From the broker’s point of view:
The offset information is maintained when a message is received. Each time the broker determines the partition to which the message is sent, the offset of the corresponding partition needs to be +1. In the author’s project, a hash structure is used to store the maximum offset for each partition:
Key:
[topic_offset]{$topic}
Copy the code
value
field value
partition 1 offset1
partition 2 offset2
Copy the code
To improve the efficiency of offset maintenance, it is not necessary to call HINCRBY every time. Instead, when the broker starts, the partitioned offset information that it maintains is loaded into memory, which is then added to memory when messages are sent and periodically saved to KV.
In addition, there needs to be a logic to correct the offset in case the broker fails unexpectedly and the offset is not saved successfully in Redis. When the broker starts, it can scan backwards from the currently maintained maximum offset. If new messages are found, the offset needs to be corrected (see the message store section below).
Message storage
When a message is written to Redis, the key satisfies the following format:
[topic]{$topic_$partition}$offset
Copy the code
Among them:
- [topic] : is a fixed prefix
- {topic_partition} : topic indicates the name of a topic. Topic indicates the name of a topic. This takes advantage of the Redis Hash Tag capability.
- $offset: indicates the offset of the current partition
Message pull
The pull is performed through the Redis Scan operation, which sends scanned messages to the consumer for processing.
When pulling messages, it relies on a consumer offset, which maintains the progress of a consumer group consuming a topic. When you pull, you start at this position. Consider using a hash data structure here:
Key:
[consumer_offset]$group_$topic
Copy the code
value
field value
partition 1 offset1
partition 2 offset2
Copy the code
When a consumer connects to a broker, the broker queries the parititon offset of the partition it is responsible for and starts pulling messages from there.
Delay message
For all delayed messages, they are first sent to a special Delay topic, which is equivalent to holding the message temporarily. When the message expires, it is delivered to the target topic.
- The original topic and partition of the message are logged to the metadata before it is sent to the deferred topic
origin_topic
,origin_partition
In the field. Then send it to delay Topic. The key format is different from normal messages, which are sorted by timestamp:
[delay]$broker_id}$expireTime
Copy the code
- There is a delayed message forwarder that continuously scans abase and changes the target topic in the delayed message to when a message is found to be due
origin_topic
,origin_partition
Field, which is then posted to the target topic from the send logic. - At the same time, the current scanned position is recorded.
Consumption retry
When a consumer fails to consume a message, the default message will be delayed logic, after the expiration, delivered to the target consumer, re-consumption. Retry messages are also developed based on delayed messages.
The logical difference is:
- Delayed messages are delivered directly to the target topic
- The retry message cannot be delivered to your target topic, because a topic has multiple consumer groups. If only one consumer group fails and needs to be retried, the other consumer groups should not be affected. Therefore, each consumer_group should have a separate retry topic. Such as:
[topic]retry_$consumer_group
Copy the code
There should also be containment partitioning under this Topic, with the same strategy as the Topic metadata maintenance described earlier.
Dead-letter queue
When the message fails again after the maximum number of retries, it can be put into a dead letter queue. Such as:
[topic]dead_$consumer_group
Copy the code
Message TTL
Messages are cleaned up to avoid consuming messages that take up a lot of storage space. Our strategy is:
- For ordinary messages: the message will be automatically cleared after 3 days, meaning that a message that has not been consumed for 3 days will be deleted.
- For delayed messages: +3 days on top of the cut-off time.
conclusion
The purpose of this article is to introduce the core ideas of how to develop an MQ based on distributed KV. Many of the topics of DISASTER recovery/high availability/performance optimization are not discussed. It’s just a core idea that needs a lot of refinement and optimization if you want to use it in a production environment.