The organizational structure of the three main MQ systems
1, the RabbitMQ
RabbitMQ component functions
-
Broker: A RabbitMQ instance is a Broker
-
Virtual Host: indicates a Virtual Host. Equivalent to Mysql’s DataBase, multiple Vhosts can exist on a Broker, isolated from each other. Each Vhost has its own queue, switch, binding, and permission mechanism. Vhost must be specified at connection time. The default vhost is /.
-
Exchange: a switch that receives messages sent by producers and routes them to queues in the server.
-
Queue: Message Queue, used to hold messages until they are sent to consumers. It is a container for messages. A message can be put into one or more queues.
-
Banding: Binding relationship for association between message queues and switches. The switch is associated with the message queue using Routing keys.
-
Channel: a two-way data Channel. Whether it’s publishing a message, subscribing to a queue, or receiving a message, all of these actions are piped. Because establishing and destroying TCP is very expensive for an operating system, the concept of pipes was introduced to reuse a TCP connection.
-
Connection: TCP Connection between producer/consumer and broker.
-
Publisher: Producer of messages.
-
Consumer: A Consumer of information.
-
Message: a Message, which consists of a header and a body. Message headers include routing-key, Priority, and so on.
Multiple switch types for RabbitMQ
When exchanges distribute messages to queues, the Exchange types correspond to different distribution policies. There are three Exchange types: Direct, Fanout, and Topic.
- Direct: in the message
Routing Key
If andBinding
In theRouting Key
Exactly the same,Exchange
The message is then distributed to the corresponding queue.
- Fanout: Every message sent to a Fanout switch is sent to all bound queues. Fanout switches do not
Routing Key
.It forwards messages the fastest of the three types of switches.
- TopicThe: Topic switch allocates messages by pattern matching
Routing Key
Matches a pattern. It can only recognize twoThe wildcard:“#” and “*”.#
Matches zero or more words,*
Match 1 word.
TTL
TTL(Time To Live) : indicates the TTL. RabbitMQ supports two expiration times for messages.
- Specifies when a message is sent. By configuring the message body
Properties
Can specify the expiration time of the current message. - Specified when creating an Exchange. It is calculated from the time it enters the message queue. If the timeout period of the queue is exceeded, the message will be cleared automatically.
Message acknowledgement mechanism for producers
Confirm mechanism
- Acknowledgement of a message means that after a producer sends a message, if the Broker receives the message, it will send us a reply.
- The producer receives a response to confirm that a message has been sent to the Broker. This is the core guarantee for reliable delivery of messages.
How do I implement Confirm confirmation messages?
- Enable confirmation mode on channel:
channel.confirmSelect()
- Enable listening on a channel:
addConfirmListener
, listens for the results of successful and failed processing, and performs subsequent operations, such as resending messages or logging, based on the results.
Return message mechanism
A Return Listener is used to process messages that are not routable.
Our message producer, by specifying an Exchange and Routing, sends the message to a queue, and our consumer listens to the queue to consume the message.
However, in some cases, if the current exchange does not exist or the specified routing key cannot be routed when sending messages, we need to use the Returrn Listener to listen for such unreachable messages.
A key configuration item in the base API is Mandatory: if true, listeners receive messages that are unreachable and process them. If false, the broker deletes the message automatically.
. Also, through the way of listening, chennel addReturnListener ReturnListener (rl) to have been rewritten ReturnListener handleReturn methods.
Consumer ACK and NACK
When consuming, the consumer can log and compensate for service exceptions. However, for serious problems such as server downtime, we need to manually ACK to ensure successful consumption on the consumer end.
// deliveryTag: the unique identifier of the message in MQ
// multiple: whether to batch (similar to qos Settings)
// requeue: Whether to return to the queue. Either throw it away or go back to the front and spend again.
public void basicNack(long deliveryTag, boolean multiple, boolean requeue)
Copy the code
As shown in the code above, the message is requeued at the consuming end to return the message to the Broker if it has not been successfully processed. In general, requeueing is turned off in practice (to avoid an infinite loop), which is set to false.
Dead letter queue DLX
Dead Letter queue (DLX dead-letter Exchange) : When a message becomes Dead Letter in one queue, it is pushed back to another queue. This queue is a Dead Letter queue.
DLX is also a normal Exchange, no different from a normal Exchange, it can be specified on any queue, in effect setting the attributes of a queue.
When there is a dead letter in the queue, RabbitMQ will automatically re-publish the message to the Exchange and route it to another queue.
2, RocketMQ
The officially designated message product of Alibaba singles’ Day supports all message services of Alibaba Group. It has passed the rigorous test of high availability and high reliability for more than ten years and is the core product of Alibaba transaction link.
B: Rocket.
The core concept of RocketMQ
He has the following core concepts: Broker, Topic, Tag, MessageQueue, NameServer, Group, Offset, Producer and Consumer.
Here are the details.
-
Broker: A role that stores and forwards messages.
-
- A Broker is a server that provides specific services. A single Broker node maintains a long connection and heartbeat with all NameServer nodes, and regularly registers Topic information with NameServer. By the way, the underlying communication and connection are implemented based on Netty.
-
- The Broker is responsible for message storage and supports lightweight queues at Topic level. A single Broker can support tens of thousands of queues and supports a push and pull message model.
-
- According to the data on the official website, it has the ability to accumulate hundreds of millions of messages, and can strictly ensure the order of messages.
-
-
-Blair: What’s your Topic? It is the first level type of message. For example, an e-commerce system can be divided into transaction messages, logistics messages, etc., and a message must have a Topic. Topics have very loose relationships with producers and consumers. A Topic can have zero, one, or more producers sending messages to it, and a producer can send messages to different topics at the same time. A Topic can also be subscribed by zero, one, or more consumers.
-
-Penny: Tag! You can think of it as a subtopic, which is the second level type of message to provide additional flexibility to the user. Using tags, messages for different purposes within the same business module can be identified with different tags for the same Topic. For example, the transaction message can be divided into: transaction creation message, transaction completion message, and so on. A message can have no Tag. Tags help keep your code clean and consistent, and they also help with the query system RocketMQ provides.
-
MessageQueue: Multiple message queues can be set up under a Topic. RocketMQ polls all queues under this Topic to send the message. The physical management unit of messages. There can be multiple queues under a Topic. The introduction of queues enables message storage to be distributed and clustered, with horizontal expansion capability.
-
NameServer: Similar to Zookeeper in Kafka, but with no communication between NameServer clusters, it is lighter than ZK. It is mainly responsible for source data management, including Topic and routing information management. Each Broker registers with NameServer when it is started. Producer obtains routing information from NameServer according to Topic before sending messages. Consumers also obtain routing information from Topic on a regular basis.
-
Producer: supports sending messages in three modes: synchronous, asynchronous, and unidirectional
-
One way to send
: After the message is sent, it can continue to send the next message or execute the business code without waiting for the server to respond, andThere is no callback function.
-
Asynchronous send
: After the message has been sent, you can continue to send the next message or execute the business code without waiting for the server to respond.There are callback functions.
-
The synchronous
: After the message is sent, wait for a successful or failed response from the server before continuing.
-
-
Consumer: supports PUSH and PULL consumption modes, cluster consumption and broadcast consumption
-
Cluster consumption
: In this mode, a consumer cluster jointly consumes multiple queues of a topic. A queue can only be consumed by one consumer. If a consumer dies, other consumers in the group will continue to consume after the consumer dies.
-
Radio consumption
: will be sent to each consumer in the consumer group for consumption. The equivalent ofRabbitMQPublish and subscribe.
-
-
Group: A Group that can subscribe to multiple topics. It can be divided into ProducerGroup and ConsumerGroup, which represent producers and consumers of a certain type. Generally speaking, the same service can be used as a Group, and the same Group generally sends and consumes the same messages
-
Offset: In RocketMQ, all message queues are persistent, data structures of infinite length. By infinite length, each storage unit in the queue is of a fixed length, and the storage unit is accessed using Offset, which is Java Long, 64-bit, Theoretically it won’t overflow for 100 years, so it’s supposed to be infinite. You can also think of a Message Queue as an array of infinite length, with Offset being the subscript.
Delay message
RocketMQ does not support arbitrary timing accuracy, only specific levels such as timing 5s, 10s, 1min, and so on. Level =0 indicates no delay, level=1 indicates level 1 delay, level=2 indicates level 2 delay, and so on.
The delay levels are as follows:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
Copy the code
The order message
Message ordering means that messages can be consumed in the order in which they are sent (FIFO). RocketMQ ensures strict message ordering, either partitioned or globally ordered.
Transaction message
Message queue MQ provides distributed transaction functionality similar to X/Open XA, where the ultimate consistency of distributed transactions can be achieved through message queue MQ transaction messages. The figure above illustrates the general flow of transaction messages: normal transaction message sending and submission, transaction message compensation process.
- Transaction message sending and submission:
- Send half message
- The server responds to the message by writing the result
- A local transaction is executed based on the sent result (if the write fails, the half message is not visible to the business and the local logic is not executed);
- Perform Commit or Rollback based on the local transaction state (the Commit operation generates the message index and the message is visible to the consumer).
- Compensation process for transaction messages:
- Initiate a “back check” from the server for transaction messages (pending status messages) that are not Commit/Rollback.
- After receiving the query message, Producer checks the status of the local transaction corresponding to the query message.
- Recommit or RollBack based on the local transaction status
The compensation phase is used to resolve the timeout or failure of message Commit or Rollback.
- Transaction message status:
Transaction messages have three states: commit state, rollback state, and intermediate state:
- TransactionStatus.Com mitTransaction: submit a transaction, it allows consumer spending this message.
- TransactionStatus. RollbackTransaction: roll back a transaction, it represents the message will be deleted, is not permitted to consume.
- TransactionStatus. Unkonwn: intermediate state, it means we need to check the message queue to determine the state of the message.
RocketMQ’s high availability mechanism
RocketMQ is naturally distributed and can be configured for master/slave and horizontal scaling
Master brokers support read and write, while Slave brokers only support read. That is, Producer can only connect to Master brokers and write messages. The Consumer can connect to either the Master or Slave Broker to read messages.
High availability of message consumption (master-slave)
In the Consumer configuration file, there is no need to set whether to read from Master or Slave. When the Master is unavailable or busy, the Consumer is automatically switched to read from Slave. With automatic Consumer switching, when a Master machine fails, the Consumer can still read messages from the Slave without affecting the Consumer program. This leads to high availability on the consumer side. RocketMQ does not currently support automatic conversion from Slave to Master. If you need to convert a Slave to Master due to insufficient machine resources, manually stop the Broker for the Slave role, change the configuration file, and start the Broker with the new configuration file.
High availability of message sending (configure multiple primary nodes)
When a Topic is created, Message queues for the Topic are created on multiple Broker groups (machines with the same Broker name and different Brokerids form one Broker group) so that when the Master of a Broker group becomes unavailable, The other group’s Master is still available, and the Producer can still send messages.
A master-slave replication
If a Broker group has Master and Slave, messages need to be copied from the Master to the Slave, either synchronously or asynchronously.
-
Synchronous replication: In synchronous replication mode, data is written successfully to the Master and Slave before the client receives a report on the write success status. If the Master fails, all backup data is stored on the Slave. Synchronous replication is easy to recover, which increases data write latency and reduces system throughput.
-
Asynchronous replication: In asynchronous replication, the Master sends the write success status to the client as long as the write success is achieved. In asynchronous replication, the system has low latency and high throughput, but if the Master fails, some data may be lost because it is not written to the Slave
In general, it is a good idea to configure the Master and Save as synchronous flush mode, and the Master and slave as asynchronous replication mode, so that even if one machine fails, the data will still be lost.
Load balancing
Producer Load balancing
On the Producer end, each instance polls all message queues by default when sending messages, so that messages fall evenly on different queues. Since queues can be scattered among different brokers, messages are sent to different brokers, as shown below:
Consumer load balancing
If the number of consumer instances is greater than the total number of Message queues, the additional consumer instances will not be queued and will not consume messages, thus contributing to load sharing. So we need to make sure that the total number of queues is greater than or equal to the number of consumers.
- Clustering of consumers – Starting multiple consumers ensures load balancing of consumers (evenly distributed queues)
- The default is to use an evenly distributed queue: queues are allocated to each instance according to the number of queues and the number of instances, so that each consumer can evenly distribute the consumption of the queue, as shown in the figure below: 6 queues and 3 producers.
- Another averaging algorithm is in the form of circular queue, where each consumer evenly distributes a message queue of different primary nodes, as shown in the figure below:
The broadcast mode is not load-balanced and requires that a message be delivered to all consumer instances under a consumer group, so there is no claim that the message is shared.
Dead-letter queue
When a message fails to be consumed, RocketMQ automatically retries the message. If a message exceeds the maximum number of retries, RocketMQ considers the message problematic. At this point, however, RocketMQ does not immediately discard the problematic message. Instead, it sends it to a special queue for the consumer group: a dead letter queue. The dead-letter queue name is %DLQ%+ConsumGroup
Dead-letter queues have the following features:
- A dead letter queue corresponds to a Group ID, not to a single consumer instance.
- If a Group ID does not generate a dead-letter message, message queue RocketMQ does not create a dead-letter queue for it.
- A dead-letter queue contains all dead-letter messages generated for the corresponding Group ID, regardless of which Topic the message belongs to.
3, Kafka
Kafka is a distributed, partitioned, multi-replica, Zookeeper coordinated distributed messaging system.
Its biggest feature is that it can process large amounts of data in real time to meet various demand scenarios, such as hadoop-based batch processing system, low latency real-time system, Storm/Spark streaming processing engine, Web/Nginx log, access log, message service, etc., written in Scala language. Apache foundation’s top open source project.
Take a look at Kafka’s architecture diagram
The core concept of Kafka
There are several core concepts in Kafka: Broker, Topic, Producer, Consumer, ConsumerGroup, Partition, Leader, Follower, and Offset.
-
Broker: Message-oriented middleware processes nodes. A Kafka node is a Broker. One or more brokers can form a Kafka cluster
-
Topic: Kafka categorizes messages by Topic, and each message published to the Kafka cluster needs to be assigned a Topic
-
Producer: A message Producer, a client that sends messages to the Broker
-
Consumer: Message Consumer, a client that reads messages from the Broker
-
ConsumerGroup: Each Consumer belongs to a specific ConsumerGroup. A message can be consumed by multiple Consumergroups, but only one Consumer in a ConsumerGroup can consume the message
-
Partition: A physical concept. A topic can be divided into multiple partitions, and messages within each Partition are ordered
-
Leader: Each partition has multiple copies, and only one of them acts as the Leader. The Leader is responsible for data reads and writes.
-
All write requests are routed by the Leader. Data changes are broadcast to all followers, who keep data synchronized with the Leader. If the Leader fails, a new Leader is elected from the followers. If a Follower hangs, gets stuck, or synchronizes too slowly with the Leader, the Leader removes the Follower from the ISR list and creates a new Follower.
-
Offset: indicates the Offset. Kafka files are named offset. Kafka files are named offset. Kafka files are named offset. For example, if you want to find the location 2049, just find the file 2048. Kafka
Topic, Partition, and Broker:
A Topic represents a logical business data set. For example, order-related operation information is put into order-related operation information, and user-related operation information is put into user Topic. For large websites, there are massive back-end data, and the order information may be very large, for example, there are hundreds of G’s or even TB level. If so much data is placed on one machine, there will be a capacity limitation problem. Therefore, you can divide the data into multiple partitions within a Topic to store the data in fragments. Different partitions can be located on different machines, which is equivalent to distributed storage. A Kafka process Broker runs on each machine.
Kafka core Controller Controller
There are one or more brokers in a Kafka cluster. One broker is elected as a Kafka Controller, or broker-leader, which is responsible for managing the state of all partitions and replicas in the cluster.
- When a
Partition-Leader
When the replica fails, the controller elects a new Leader replica for the partition. - It is the responsibility of the controller to inform all brokers to update their metadata information when changes to the ISR collection of a partition are detected.
- When you increase the number of partitions for a topic, it is also the controller’s responsibility to make the new partitions known to the other nodes.
Controller Election mechanism
When a Kafka cluster is started, the process of election is that each broker in the cluster tries to create a /controller temporary node on ZooKeeper. Zookeeper guarantees that only one broker can be created successfully. The broker becomes the cluster’s master controller.
When the broker of the controller role goes down, the Temporary ZooKeeper node will disappear. Other brokers in the cluster will always monitor the temporary node and find that the temporary node has disappeared. Then they compete to create the temporary node again, which is the election mechanism mentioned above. Zookeeper in turn ensures that a broker becomes the new controller. A broker with controller status has one more responsibility than any other broker. Details are as follows:
- Listen for changes related to the broker. Add BrokerChangeListener to the/Brokers/IDS/node in Zookeeper to handle broker changes.
- Listen for changes related to topics. Add TopicChangeListener to the /brokers/ Topics node in Zookeeper to handle topic changes; Add TopicDeletionListener to the /admin/delete_topics node in Zookeeper to handle topic deletion.
- Read and manage all current topic, partition, and broker information from Zookeeper. For all of the topic in the Zookeeper/brokers/switchable viewer/add PartitionModificationsListener nodes, used to listen to partition in the topic changes.
- Update metadata information for the cluster to other normal Broker nodes
The Partition copy elects the Leader mechanism
The controller senses that the broker of the partition Leader is down, Controller from the list of ISR (parameter unclean. Leader. Election. Under the premise of the enable = false) to choose the first broker as a leader in (the first broker in the ISR list first, might be the most synchronous data copy), If the parameter unclean. Leader. Election. Enable to true, on behalf of all copies are hung in the ISR list can choose leader in ISR outside a copy of the list, this setting, can improve the availability, but choose the new leader of possible data is much less. There are two conditions for a copy to enter the ISR list:
- The replica node cannot generate partitions, but must be able to maintain sessions with ZooKeeper and network connectivity with the Leader replica
- The deputy copies all the writes on the leader and can’t fall too far behind. (The replica that is synchronized with the leader is delayed is determined by the replica.lag.time.max.ms configuration. The replica that has not been synchronized with the leader within this time is removed from the ISR list.)
Offset logging mechanism for consumer consumption messages
Each consumer periodically submits its consumption partition offset to the Kafka internal topic: The key is the consumerGroupId+topic+ partition number, and the value is the current offset. Kafka periodically cleans up the messages in the topic, and finally retains the latest data
Because __consumer_offsets might receive high concurrent requests, kafka default to its assigned 50 partition (through offsets. Topic. Num. Partitions), so that we can through the way of adding machine to sustain concurrency.
Consumer Rebalance mechanism
Rebalance means that Kafka reassigns the relationship between consumers and consumer segments if the number of consumers in a consumer group changes or if the number of consumer segments changes. For example, if a consumer in a consumer group dies, the partition assigned to him will be automatically transferred to other consumers, and if he restarts, some partitions will be returned to him.
Rebalance only subscribes without a partition. If you assign a partition, kafka will not run a rebanlance.
This can trigger consumer rebalance:
- The number of consumers in the consumer group increased or decreased
- Dynamics add partitions to topics
- The consumer group subscribed to more topics
Rebalance. If a Kafka cluster has hundreds of nodes, rebalancing can be time-consuming. Avoid rebalancing during peak hours.
The Rebalance process is as follows
When a consumer joins a consumer group, the consumer, consumer group, and group coordinator go through the following phases
Stage 1: Select the group coordinator
GroupCoordinator: Each consumer group selects a broker as its group coordinator. It monitors the heartbeat of all consumers in the group and determines whether there is a outage. After that, it enables the consumer rebalance. Each consumer ina Consumer group starts up and sends FindCoordinatorRequest to a node in the Kafka cluster to find the corresponding Group coordinator, the GroupCoordinator, and establish a network connection with it. The group coordinator can be selected by using the following formula to determine which partition of the __consumer_offsets consumer consumption offset should be submitted to. The leader of the partition is the coordinator of the consumer group: Hash (Consumer Group ID) % Number of partitions corresponding to the topic
Stage 2: JOIN the consumer GROUP JOIN GROUP
After a GroupCoordinator corresponding to a consumer group is successfully found, the consumer joins a consumer group. In this phase, the consumer sends a JoinGroupRequest request to the GroupCoordinator and processes the response. The GroupCoordinator then selects the first consumer to join a consumer group as the leader and sends the information about the consumer group to the leader. The leader is then responsible for the zoning plan.
Phase 3 (SYNC GROUP)
The consumer leader sends a SyncGroupRequest to the GroupCoordinator, which then sends the partition scheme to each consumer. They make network connections and consume messages based on the leader broker of the specified partition.
Consumer Rebalance partition allocation policy
There are three rebalance strategies: range, round-robin, and sticky. The default is range.
Suppose a topic has 10 partitions (0-9) and now has three consumers consuming it:
Range policy: Allocate partitions by partition number. Assume that n = number of partitions/number of consumers = 3 and m = number of partitions % Number of consumers = 1. Then the first m consumers are allocated N +1 partitions, and the next (number of consumers -m) consumers are allocated N partitions. For example, partitions 0 to 3 are for one consumer, partitions 4 to 6 are for one consumer, and partitions 7 to 9 are for one consumer.
Round-robin policy: For example, partitions 0, 3, 6, and 9 are allocated to one consumer, partitions 1, 4, and 7 are allocated to one consumer, and partitions 2, 5, and 8 are allocated to one consumer
The sticky policy: The initial allocation policy is similar to the round-robin policy. However, when rebalance, ensure that the following rules are correct.
- Partitions should be distributed as evenly as possible.
- Partitions are allocated as much as possible as they were last allocated.
When the two conflict, the first goal takes precedence over the second. This maximizes the retention of the original partition allocation strategy. For example, in the first range case, if the third consumer fails, then the result of re-allocating is as follows: Consumer1 allocates a 7 in addition to the original 0~ 3, and consumer2 allocates 8 and 9 in addition to the original 4~ 6
Analysis of producer releasing message mechanism
-
Written way
The producer uses push mode to publish messages to the broker, and each message is appended to a patition, which is a sequential write disk (sequential write disks are more efficient than random writes and guarantee Kafka throughput).
-
Message routing
When producer sends a message to the broker, it selects which partition to store it to based on the partitioning algorithm. Its routing mechanism is as follows:
- If patition is specified, it is used directly.
- Pass if no patition is specified but key is specified
Hash (key) % partition number
Count the selected patition. If neither a patition nor a key is specified, polling is used to select a patition.
-
Writing process
- Zookeeper’s “/brokers/… The /state” node finds the leader of the partition
- Producer sends messages to the leader
- The leader writes the message to the local log
- The followers write the leader pull message to the local log and send an ACK to the leader
- After receiving the REPLICA ACKS in all isRS, the leader adds HW (high watermark, and finally commit offset) and sends THE ACK to the producer
HW with LEO
HW is short for HighWatermark. The lowest LEO(log-end-offset) in the ISR corresponding to a partition is used as THE HW. A consumer can consume only the location where the HW is located. In addition, each replica has HW, and the leader and follower are responsible for updating their OWN HW configuration. For a new message written by the leader, the consumer cannot consume it immediately. The leader will wait for the message to be synchronized with replicas in all ISR replicas to update the HW, and then the message can be consumed by the consumer. This ensures that if the leader broker fails, the message can still be retrieved from the newly elected Leader. There are no HW restrictions on read requests from the internal broker.
Segmented log storage
Kafka partition data is stored in a folder named topic + partition number. Messages are stored in segments within a partition, and each segment is stored in a different log file. Kafka specifies a maximum size of 1 gb of logs per segment. The purpose of this restriction is to facilitate the loading of log files into memory for operation:
1Every time Kafka sends a 4K(configurable) message to a partition, it records the offset of the current message into the index file.2If you want to locate the message offset, you will quickly locate the message in this file, and then go to the log file to find the specific message3 00000000000000000000.index
4# message store file, mainly store offset and message body5 00000000000000000000.log
6Each time Kafka sends a 4K(configurable) message to a partition, it records the sending time stamp of the current message with the corresponding offset to the timeIndex file.7If you need to locate the offset of a message by time, look it up in this file first8 00000000000000000000.timeindex
9
10 00000000000005367851.index
11 00000000000005367851.log
12 00000000000005367851.timeindex
13
14 00000000000009936472.index
15 00000000000009936472.log
16 00000000000009936472.timeindex
Copy the code
A number such as 9936472 represents the starting Offset contained in the log segment file, which means that close to 10 million pieces of data have been written to the partition. Kafka Broker has a parameter, log.segment.bytes, that limits the size of each log segment file to a maximum of 1GB. When a log segment file is full, a new log segment file is automatically created to write data. This process is called log Rolling. The active log segment file being written is called active log segment.
Finally, a Data graph of ZooKeeper node is attached
Some of the problems with MQ and the solutions
1. How to ensure sequential consumption?
- RabbitMQ: One Queue for one Consumer.
- RocketMQ
- Globally ordered: a single MessageQueue within a Topic is sufficient.
- Local order: based on routing algorithms such as
Hash (key) % queue number
The routing index is obtained to ensure that ordered messages are routed to the same MessageQueue.
- Kafka:
- Globally ordered: only one Partition in a Topic.
- Local order: based on routing algorithms such as
Hash (key) % partition number
The routing index is obtained to ensure that ordered messages are routed to the same Partition.
2. How to realize delayed consumption?
- RabbitMQ: Two solutions
- Dead letter queue + TTL
- Introduce a delay plugin for RabbitMQ
- RocketMQ: Inherently supports delayed messages.
- Kafka: The steps are as follows
- Create a Topic specifically for messages to be deferred
- Create a new consumer to consume this Topic
- Message persistence
- Another thread is opened periodically to pull persistent messages into the actual Topic to be consumed
- Consumers who are actually consuming pull messages from the Topic they are actually consuming.
3. How to ensure the reliability of message delivery
-
RabbitMQ:
-
Broker–> consumer: Manual ACK
-
Producer –>Broker: Both options
-
1. Database persistence
1.Persisting the business order data and the generated Message (typically inserted into a database, where distributed transactions may be involved if there is a repository)2.Send the Message to the Broker server3.Using the Confirm mechanism of RabbitMQ, the producer listens for the ACK of the server.4.If it does, the data status of Message is updated to sent. If no, change the status to failed.5.Distributed scheduled task query database3Failed to send the message within minutes (the specific time should be determined according to the timeliness)6.Resend the message and record the sending times7.If sending too many times still fails, manual troubleshooting and other operations are required.Copy the code
Advantages: Can ensure that messages are not lost.
Disadvantages: The first step involves distributed transaction issues.
-
-
2. Delayed delivery of messages
-
In the flowchart, different colors represent different messages1.Persisting business orders2.Send one Message to the broker(called the primary Message) and the same Message to a different queue or switch (called the acknowledgement Message).3.After the main Message is consumed by the actual business handler, a response Message is generated. Previously acknowledged messages are processed and stored by the Message Service application.4~6.After the actual Message is received by the Message Service, the original Message status is changed.7.If the Message is not acknowledged, the whole process is resumed by producer using an RPC call.Copy the code
-
Advantages: Improved response speed compared to persistence schemes
Disadvantages: High system complexity. Ten thousand or two messages fail. Messages are lost and the Confirm mechanism is still needed to compensate.
- RocketMQ:
-
The producer loses data
In the process of sending messages to the Broker, the Producer loses them due to network problems, or the Message reaches the Broker but has problems and is not saved. RocketMQ addresses this problem by setting up three ways for producers to send messages:
-
Synchronous delivery: inherently ensures reliable delivery
-
Asynchronous send: You need to customize the implementation in the callback function based on the result of the broker response.
-
One-way delivery: no reliable delivery can be guaranteed
-
-
The Broker lost data
The Broker receives a Message that is temporarily stored in memory and hangs before the Consumer can consume it
This can be resolved through persistent Settings:
1. Set persistence when creating a Queue to ensure that the Broker persists the metadata of the Queue but not the messages in the Queue
2. Set the deliveryMode of Message to 2 to persist messages to disk. In this way, a notification will be sent to Producer ACK only after Message is supported to disk
After these two steps, even if the Broker hangs and the Producer cannot receive an ACK, the Producer can resend the ack
- Consumers lose data
The Message has not yet been processed. The Broker assumes that the Consumer has finished processing and will only send subsequent messages. In this case, autoACK must be closed. After the message processing, manual ACK will be carried out. The messages that fail to consume many times will enter the dead-letter queue, and manual intervention is needed at this time.
- Kafka:
-
The producer loses data
If acks=all is set, the write will not be lost. The requirement is that the write is considered successful only after the leader receives the message and all the followers have synchronized the message. If this condition is not met, the producer automatically retries an unlimited number of times.
-
The Broker lost data
Kafka a broker breaks down and then reelects the partition’s leader. If some data from other followers is not synchronized at this time, the leader dies. If a follower is elected as the leader, some data will be lost. There’s some data missing.
In this case, at least four parameters must be set:
-
- For a topic
replication.factor
Parameter: This value must be greater than 1, requiring that each partition must have at least 2 copies.
- For a topic
-
- Set on the Kafka server
min.insync.replicas
Parameter: This value must be greater than 1, which requires the leader to be aware that there is at least one follower still in contact with the leader, so as to ensure that there is still one follower after the leader dies.
- Set on the Kafka server
-
- Set on the producer side
acks=all
: This requires that each piece of data must be written to all replicas before it can be considered as written successfully.
- Set on the producer side
-
- Set on the producer side
retries=MAX
If a write fails, it will retry indefinitely.
- Set on the producer side
Our production environment is configured in such a way that, at least on the Kafka broker side, data will not be lost in the event of a leader switch due to a failure of the leader broker.
- Consumers lose data
You consume the message, and then the consumer automatically submits the offset, making Kafka think you have consumed the message, but you are just about to process the message, and before you can process it, you hang up, and the message is lost.
This is similar to RabbitMQ. Kafka is known to automatically commit offsets, so simply turn off the automatic commit and manually commit offsets after processing to ensure data is not lost. However, there may still be repeated consumption at this time. For example, if you die before submitting offset after processing, you will definitely repeat consumption once, as long as you ensure idempotency.
4. How can messages be idempotent?
Using RocketMQ as an example, the following lists the scenarios where messages are repeated:
1. Duplicate messages are sent
After a message is successfully sent to the server and persists, the server fails to respond to the client due to intermittent network disconnection or client breakdown. If at this point the producer realizes that the Message failed and tries to send the Message again, the consumer will subsequently receive two messages with the same content and the same Message ID.
2. Duplicate messages during delivery
In the message consumption scenario, the message is delivered to the consumer and the service is processed. When the client replies to the server, the network is disconnected. To ensure that the Message is consumed at least once, the RocketMQ version of the Message queue will try to deliver the previously processed Message once the network is restored. The consumer will then receive two messages with the same content and the same Message ID.
3. Repeated messages during load balancing (including but not limited to network jitter, Broker restart, and consumer application restart)
Consumers may receive repeated messages when the RocketMQ version of the message queue Broker or client restarts, expands, or shrinks, triggering Rebalance.
So what’s the solution? Go straight to the diagram above.
5. How to solve the problem of message backlog?
There are a few points to consider about this question:
1. How to quickly get the backlog of messages consumed?
The consumer who temporarily writes a message to distribute the messages in the backlog queue evenly distributes the messages to N queues, and one queue corresponds to one consumer, which increases the consumption speed by N times.
Modify before:
Revised:
2. Some messages expire due to a long backlog. What do I do?
Batch rerouting. During off-peak hours, such as the wee hours of the morning, prepare programs to retrieve the missing batch of messages and import them back into MQ.
3. A large number of messages are backlogged, and the MQ disk is full. As a result, new messages cannot be received.
I can’t help it. Because [message distribution consumers] write too slowly, you write AD hoc programs, access data to consume, consume one by one, discard one by one, quickly consume all messages. Then go to plan two and make up the data later in the evening.