Message queue combos
- How is MQ used in the project?
- Why use message queues?
- What are the advantages and disadvantages of message queues?
- Kafka, activemq, rabbitmq, rocketmq have to bai?
- How can message queues be highly available?
- How to ensure that messages are not re-consumed?
- How to ensure the reliable transmission of messages?
- How do I ensure that messages are sequential?
- Write a message queue architecture design?
Message queue technology selection
Problems solved:
- The decoupling
- asynchronous
- Peak clipping
No MQ system coupling scenarios
- System A has produced A relatively key data, many systems need system A to send data, strong coupling (system B,C,D,E may have different parameters, need data for A while and do not need data for A while, system A has to constantly modify the code maintenance)
- System A also needs to consider whether system B, C, D, E hang, whether access timeout? Do you want to retry?
Decouple scenarios using MQ systems
- Maintenance of this code, do not need to consider whether others call success, failure timeout
- If the new system needs the data, it can consume it directly from MQ, and if a system does not need the data, it can unconsume the MQ message. Summary: System A is completely decoupled from other systems through A publish-subscribe messaging model of MQ (Pub/Sub).
Synchronize high-latency request scenarios without MQ
In general Internet enterprises, direct operations on users are required to be within 200ms for each request, which is almost insensitive to users.
Interface performance tuning after asynchrony using MQ
Improve the interface with high latency
There is no MQ rush hour system killed scenario
Peak 5000 requests per second, 5000 SQL queries per second to MySQL (normally 2000 MySQL requests per second is about the same), if MySQL is killed, then the whole system crashes and users can’t use the system. But after the peak, there might be 50 requests per second, putting no pressure on the whole system.
Peak clipping scenario using MQ
5000 requests are written to MQ. System A can process at most 2000 requests per second (MySQL can process at most 2000 requests per second). System A slowly pulls requests from MQ to 2000 requests per second. MQ, 5000 requests come in every second, but only 2000 requests go out. As a result, during peak times (21 hours), there may be hundreds of thousands or even millions of requests stuck in MQ, which is normal because after peak times there are 50 requests per second. But system A is still processing 2,000 of these requests per second. As soon as the peak is over, System A will quickly clear the backlog of messages. (At a cost of 3000 messages per second in MQ, 18W messages per minute and 10 million messages per hour. Once the peak is over, it takes about an hour to clear 1000W messages.)
Problems with the introduction of MQ in the architecture
- Reduced system availability MQ may hang, causing the entire system to crash
- As the system complexity increases, duplicate messages may be sent, leading to the insertion of duplicate data. The message was lost; Messages are out of order; Systems B,C, and D hang, causing MQ messages to accumulate and disks to fill up;
- If A,B,C, and D are all executed successfully, the system returns the following result: A,B, and C are executed successfully. D fails
What are the advantages and disadvantages of Kafka, ActiveMQ, RabbitMQ, and RocketMQ
features | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|---|
Single machine throughput | 10000 level (1W~2W request per second) | All level | Hundreds of level | Hundreds of level |
timeliness | Ms level | Microsecond, this is a feature of RabbitMQ, with the lowest latency | Ms level | Ms than level |
availability | High, high availability based on master-slave architecture | High, high availability based on master-slave architecture | Very high, distributed architecture | Very high. Kafka is distributed, with multiple copies of a single piece of data, and a few machines down, without data loss and unavailability |
Message reliability | There is a low probability of data loss | After optimized parameter configuration, 0 can be lost | After optimized parameter configuration, messages can be 0 lost | |
Summary of advantages and Disadvantages | Advantages: Very mature, powerful, used in a large number of companies and projects in the industry. Disadvantages: Occasionally there is a low probability of message loss, and now there are fewer and fewer community and domestic applications. The official community maintains less and less ActiveMQ 5.x, and indeed it is mainly used based on decoupling and asynchronous, and rarely used in large-scale throughput scenarios | Advantages: Erlang language development, excellent performance, low latency, great management interface, active community Disadvantages: RabbitMQ does have lower throughput (tens of thousands per machine) due to its heavy implementation mechanism. And Erlang development, how many domestic strength to do Erlang source level research and customization? Lack of control and rely on the open source community to maintain and fix bugs. And the RabbitMQ cluster dynamic scaling can be cumbersome, mainly due to the Erlang language itself, which is difficult to read the source code, customize and control | Advantages: Interface is simple and easy to use, Ali guarantee, daily processing of tens of billions of messages, can do large-scale throughput, performance is also very good, distributed expansion is also very convenient, community maintenance can also, reliability and availability are OK, can also support a large number of topic, support complex MQ business scenarios, the source is Java, Convenient customization and control disadvantages: the community is generally active, the interface is not according to the standard JMS specification, some system migration needs to modify a lot of code, ali introduced the technology, it may be abandoned. | Advantages: Provides less core functionality, but improves extremely high throughput, ms-class latency, extremely high availability and reliability, and distribution can scale arbitrarily. Kafka is best supported by a small number of topics for extremely high throughput. Disadvantages: There may be repeated message consumption, which will affect the accuracy of data. In the field of big data and log collection, this impact can be ignored, which is naturally suitable for real-time calculation of big data and log collection |
Suggestion: Small and medium-sized companies RabbitMQ Large companies: RocketMQ Big data real-time computing: Kafka
Message queues are highly available
RabbtitMQ high availability
RabbitMQ has three modes: single-machine mode, common cluster mode, and mirrored cluster mode
- Single-machine deployment mode Demo
- Common Cluster mode (not high availability)
Queue metadata exists in multiple instances, but messages do not exist in multiple instances each time
Start multiple rabbitMQ instances on multiple machines, one on each machine.
Advantages: Messages can be consumed by multiple machines to increase throughput of consumption
Disadvantages: May cause a lot of data transfer inside RabbitMQ. The availability is not guaranteed, the machine on which the queue is based is down, so you can’t consume it
There is no high availability - Mirrored cluster mode (highly available, non-distributed)
Each time a message is written to a queue, the message is automatically synchronized to the queues of multiple instances. That is, each node has a complete mirror of the queue (all data in the queue). If any node goes down, the other nodes also contain the complete data of this queue, and other consumers can consume data from other live nodes, it is OK. Disadvantages: Not distributed, if the queue is too large for the machine to accommodate.
To enable the mirroring cluster mode, perform the following operations: In the Admin console, add a mirror cluster mode policy that requires data to be synchronized to all nodes or to a specified number of nodes. Then when you create a queue again, apply this policy and the data will be automatically synchronized to other nodes. - Kafka high availability architecture
The broker process is kafka’s own process that starts on each machine. Each machine + broker process on the machine can be considered a node in a Kafka cluster.
You create a topic that can be divided into multiple partitions. Each partition can reside on a different broker, and each partition holds a portion of the data.
This is a natural distributed message queue, meaning that the data for a topic is distributed across multiple machines, with each machine hosting a portion of the data.
The real meaning of distributed is that each node has only a part of the data, not the whole data (HA, clustering).
Prior to Kafka 0.8, there was no HA mechanism. If any broker went down, some data was lost.
After Kafka 0.8, HA mechanism is provided, namely replica mechanism. The data of each partition will be synchronized to other machines to form multiple replica copies. Then all replicas elect a leader. Then both producers and consumers deal with the leader, and other replicas are follow. While writing, the leader is responsible for synchronizing data to all followers, while reading data directly from the leader. If a broker goes down and happens to be the leader of a partition, a new leader is elected, and people continue to read and write to the new leader. This is called high availability.
Synchronization mechanism between leader and follower:
When the data is written, the producer writes to the leader, who then writes the data to the local disk, and the other followers actively pull the data from the Leader themselves. Once all the followers have synchronized their data, they send an ACK to the leader, who returns a write success message to the producer after receiving an ACK from all the followers.
When consuming, the user will only read a message from the leader. However, the user will only read a message if it has been ack successfully synchronized by all followers.
Message queue duplicates data
MQ can only ensure that messages are not lost and cannot be sent repeatedly
Repeat consumption problems on the Kafka consumer side
Each message has an offset representing the sequence number of the message. Kafka assigns an offset to each piece of data in the order in which the data is sent to Kafka. When consumers consume from Kafka, they consume in this order. The consumer submits offset, telling Kafka that it has consumed offset=153. Zk records the message that the consumer currently consumes at offset =. If the consumer system is rebooted at this point, then the consumer will call Kafka and ask Kafka to send me the data after the last place I consumed it. The reason for repeated messages :(mainly after the restart of the consumer) the consumer does not say to submit the offset immediately after consuming a piece of data, but to submit the offset periodically. If the consumer is ready to commit the offset, and the consumer process restarts before the offset is committed, kafka will not know that the offset= 153 has been consumed. Kafka will send you data at offset=152,153,154
Ensure repeated consumption idempotency of MQ
Idempotent: a piece of data or a request can be repeated many times, and you have to make sure that the corresponding data does not change. Ideas:
- To write data to the library, first check the primary key, if there is data, do not insert, perform an update
- If YOU write redis, that’s fine, because it’s set every time, natural idempotent
- When a producer sends a message, it carries a globally unique ID. When the consumer gets the message, it first checks in Redis according to this ID. If it has been consumed before, it will be processed if it has not been consumed, and this ID will be written into Redis.
- Database based unique key
Ensure that MQ messages are not lost
MQ delivers very core messages, such as an AD billing system, where a user clicks on an AD and is charged a dollar. If the message is lost, it will continue to lose money, which can be a big loss to the company.
Possible data loss problems for RabbitMQ
- The producer writes messages that are not sent to RabbitMQ and are lost during network transmission. Or the message can be sent to RabbitMQ but is not saved because of an internal error.
- RabbitMQ receives the message and stores it temporarily in the host’s memory. As a result, RabbitMQ hangs up before the consumer can consume it, causing the data to be lost.
- RabbitMQ thinks the consumer has finished processing the purchase, but hangs up before it can process it.
Solution to problem 1: transaction mechanism: (generally not used, synchronous, producers send messages will be synchronous blocking waiting for you to succeed or fail. The throughput of messages sent by producers is reduced.
Channel.txselect try {// send a message} catch(Exception e){channel.txrollback; // Try sending this message again} channel.txcommit;Copy the code
Confirm mechanism :(this mechanism is generally used. It is asynchronous, does not block, and has high throughput.)
- Set channel to Confirm mode
- Send a message to RabbitMQ
- After sending the message, forget it
- Rabbitmq will call back to your producer’s local interface to say I received the message if it receives it
- Rabbitmq will call back to your interface if it receives an error message, telling you that the message failed and you can send it again.
Public void ack(String messageId){} public void nack(String messageId){Copy the code
Problem 2 Solution: Persist to disk
- This ensures that rabbitMQ will persist the metadata of the queue, but not the data in it
- Set deliveryMode to 2 and the message to persistent, and RabbitMQ will persist the message to disk. You must set two persistences at the same time.
- Persistence can be combined with the producer confirm mechanism to notify the producer ack only after the message has been persisted to disk, so even before persisting to disk, rabbitMQ dies, the data is lost and the producer cannot receive the ACK, so you can send it yourself. Disadvantages: There is a small chance of data loss, the message is written to RabbitMQ but not persisted to disk, and rabbitMQ hangs, causing a small amount of data to be lost in memory.
Problem 3 Solution: Cause: The consumer turns on the autoAck mechanism to tell RabbitMQ that the message has been consumed. The consumer’s system is down, the message is lost and has not been processed. And RabbitMQ thinks the message has already been processed. Turn autoAck off, process a message and send an ACK to RabbitMQ. If it goes down before processing it, RabbitMQ will not receive your ACK and will redistribute it to other consumers to process.
Possible data loss problems with Kafka
- Why the consumer lost the data: After consuming the message, the consumer automatically submits the offset. Kafka assumes that you have already consumed the message, and the message is lost when the consumer hangs up. Example: The data consumed by the consumer is written to the cache in a memory queue, the message is automatically submitted to offset, and the system restarts. As a result, the data in the memory queue that has not yet been processed will be lost. Kafka automatically commits offsets. If you disable the automatic submission of offsets and manually commit offsets after processing, data will not be lost. However, it is true that repeated consumption will occur at this time. For example, when the processing is just finished and the offset has not been submitted, the result is that the user dies. At this time, repeated consumption will definitely be done.
- Kafka drops messages because: When a broker in Kafka breaks down and the partition leader is reelected, some of the other followers are still unsynchronized. As a result, the leader dies. The unsynchronized data from the leader is discarded. If the kafka broker fails or the Leader switches over, data will not be lost.
- Set replication. Factor to topic, which must be greater than 1 to ensure that each partition must have at least 2 replicas
- Set the min.insync.replicas parameter on the Kafka server. The value must be greater than 1. The leader must be aware that there is at least one follower still in contact with him. Ensure that at least one follower can maintain normal data synchronization with the leader.
- Set acks =all on the producer end, which requires that each piece of data must be written to all replicas before it can be considered as written successfully. Otherwise the producer will try again and again, setting retries = MAX (a large retry value) requires that if the write fails, it stays there (to avoid message loss).
- Kafka sets ack =all for producer drop messages to 2. It requires the leader to receive the message and all followers to synchronize the message before the write is considered successful. If this condition is not met, the producer will retry an infinite number of times.
Message queue orderliness
Mysql binlog synchronization system, add, delete, change a data in mysql, corresponding to the increase, delete, change 3 binlogs, then sent to MQ, the three binlogs are executed in sequence, at least ensure the order of it, otherwise the order becomes delete, modify, add. Daily data synchronization reaches hundreds of millions. Mysql ->mysql, such as big Data Team, needs to synchronize a mysql library to perform various complex operations on the data of the company’s business system. Scene:
- Rabbitmq, one queue, multiple consumers, this is not obvious mess
- Kafka, a topic, a partition, a consumer, internal multithreading
RabbitMQ messages are out of order
How does RabbitMQ ensure message ordering
You need to make sure that the sequential data is in the same queue
Kafka messages are out of order
Data written to a partition must be sequential. When a producer writes data, it can specify a key, such as the order ID. In this case, the data related to the order must be distributed to a partition in order. A partition in Kafka can only be consumed by one consumer. When consumers retrieve data from a partition, there must be an order.
Kafka guarantees message ordering
If the consumer consumes a single thread + processing, and if the processing is time-consuming, processing a message is tens of ms, processing only tens of pieces of data per second, this throughput is too low. Must use multithreading to concurrent processing, pressure test consumer 4 core 8G single machine, 32 threads, the maximum can process thousands of messages per second
Message queue delay and expiration
There’s something wrong with the consumption end, it’s not consuming or consuming very slowly. The disk of your message queue cluster is almost full and no one is consuming it. After a few hours of backlogs, rabbitMQ sets the message expiration time and then it disappears. For example, when you write to mysql after each consumption, mysql hangs and the consumer hangs. Something that the consumer relies on locally dies, causing the consumer to die. Not processing consumption for a long time, resulting in full MQ. Scenario: Tens of millions of pieces of data are stuck in MQ for seven or eight hours
Quickly process the backlog of messages
One consumer per second is 1000, three consumers per second is 3000, one minute is 18W, more than 1000 W need an hour to recover.
Steps:
- Fix the problem of the consumer first to make sure it gets back up to speed, then stop all existing consumers
- Create a new topic,partition 10 times as many, and temporarily create 10 or 20 times as many queues
- Then write a temporary consumer program that distributes data. This program is deployed to consume the backlog of data. After consumption, it does not do time-consuming processing, but directly polls and writes the 10 times as many queues as the temporary ones
- Then 10 times as many machines are temporarily enlisted to deploy the consumers, with each batch consuming a temporary queue of data
- This is equivalent to temporarily expanding the Queue and consumer resources by 10 times, at 10 times the normal speed
- Once the backlog of data is quickly consumed, the original deployment architecture is restored, and the original consumer machine is used to consume messages from 3 consumers in 1 hour. Now 30 temporary consumers can consume messages in 10 minutes.
If you use RabbitMQ and set an expiration time, the queue will be cleared by RabbitMQ if it lasts longer than a certain amount of time, and the data will be lost. At this point, programs are written to retrieve the lost data and then repopulate mq to replace the data lost during the day.
If messages are stuck in MQ and are not processed for a long time, causing MQ to write to completion, you write a temporary program, access data to consume, write to a temporary MQ, and then let other consumers consume slowly or discard one by one, no more, quickly consume all the messages, and then replenish the data at night.
How to design the message queue middleware architecture
- Mq needs to support scalability and rapid expansion. Design a distributed MQ, broker->topic->partition with one machine per partition to store a portion of the data. If the current resources are not enough, add partition to topic, then do data migration, add machines.
- Mq data is dropped to disk to avoid process hangs and data is lost. Sequential write, so there is no addressing overhead of random read and write to disk, sequential read and write to disk performance is high. This is the idea of Kafka.
- Mq high availability. Multiple replicas ->leader & follower-> Broker hangs to re-elect the leader to provide services externally
- Data 0 can be lost.
This article is published by OpenWrite!