This is the 12th day of my participation in the November Gwen Challenge. Check out the event details: The last Gwen Challenge 2021

Why use message queues?

There are many application scenarios for MQ, but the core ones are decoupling, asynchronous, and peak clipping.

  • Decoupling: Writing messages to message queues and subscribing to them when needed so that no changes are made to the original system.
  • Asynchrony: Messages are written to message queues, and non-essential business logic is run asynchronously to speed up the response
  • Peak peaking: The original system slowly pulls messages from the message queue as many concurrent requests as the database can handle. In production, this brief peak backlog is allowed.

Application scenarios of message queues

The decoupling

Decoupling scenario 1: For example, after placing an order, the system needs to be deployed to notify delivery and the integration system needs to increase points. Normally, you need to write code to call these interfaces manually after placing an order, and if a system call fails, the order fails. These systems are coupled together, and the failure of one system can cause the entire order to fail. If MQ is introduced, after placing an order, the data is pushed to MQ, and the distribution system and SMS system pull the data for consumption. At this time, the order system and the distribution system and SMS system are decoupled, and the failure of these two systems will not affect the process of placing an order. After placing an order, the user will be directly returned with the success of placing an order. Figure 1 and Figure 2 below show the situation before and after decoupling respectively.

Decoupling scenario 2: It is still the order system. After placing an order, users need to push the order data to other department systems (such as big data department) to do some work such as statistical analysis. Under normal circumstances is also need to place the order finished, push the order to other system data, it will also have some problems, such as big departments need to add or delete data transmission of the data fields, other department also need the order data, a department suddenly don’t need these data again, these operations need to modify the code to achieve, together with other system coupling. After the introduction of MQ, the order system no longer needs to actively adjust the interface of other systems to push data, the order system only needs to push data to MQ, which other departments and systems need order data, write their own codes to pull MQ. The diagram below:

asynchronous

I do not know whether we have called the third party system, experienced children shoes all know, speaking of the third party system in the mind of a word is: pit! You never know when it’s going to die, and you never know when it’s going to respond at a snail’s pace, so stability is a bit of a mixed bag. Just like the order system, your SMS function is provided by the third-party SMS system, and the delivery function is provided by the third-party logistics system. You need to write your own complex retry logic when the interface fails, whether you call it synchronously or asynchronously; If the retry fails, you also need to persist the data and push it again at a certain time in the future with a scheduled task, because maybe the system is just unavailable at that time, and then half an hour after the system is restored, you still need to push it again. This whole logic can be complex and affect performance. If MQ is introduced, we can simply push messages to AND consume from MQ. If the interface is not available, a consumption failure is returned to MQ, and the message can be pulled again for consumption next time without the need to manually write complex retry code, etc. The following figure

Peak clipping

In most cases, the bottleneck of the system will be in the database, assuming that the database can support 6000 requests per second. In some peak scenarios such as second kill and double Eleven, the requests per second can reach 3W. Then the database will not be able to handle such high concurrency, whereas MQ can handle tens of thousands of concurrent requests without any problem. In this case, we can introduce MQ, write the request to MQ and consume the message from MQ. At this time, the request to MQ is still 3W per second, but the message from MQ is controlled at 6000 per second. The backlog of requests can be consumed slowly during the idle period. In this way, 3W requests per second can also be carried down, as shown below

What are the disadvantages of message queues

Generally speaking, when a technology solves some problems, it inevitably leads to other problems. It’s just that the new problems are easier to solve than the old ones, and that the introduction of new technologies does more good than harm. The introduction of MQ also causes some problems, with the following disadvantages:

  • The system availability decreases

We know that the more external dependencies a system introduces, the more likely it is to fail. You see, originally you have four systems ABCD, A directly call BCD three system interface OK, but you introduced A MQ, MQ failed, make the whole system is not available.

  • System complexity enhancement

It was ABCD, and the technology stacks were pretty simple, but then you introduced MQ, and there were a lot of weird problems with data loss, data re-consumption, and things like that, which made the system a lot more complicated.

  • Consistency problem

ABCD or four systems, originally A call system BCD, call B failure will not call system CD, directly rollback data. If you introduce MQ, system A pushes data to BCD and returns success directly, but only BC consume message succeeds, system D consume failed write, how do you ensure data consistency of system? Since message queues create these problems, we must address them when we use MQ, otherwise we are effectively digging holes for our future selves. Here are a few common message queuing issues that need to be addressed:

  • How can message queues be highly available?
  • How to ensure that messages are not re-consumed?
  • How do I handle message loss?
  • How do I ensure that messages are sequential?
  • How to handle a large message backlog in message queues?

Message queue selection

At present, the more popular message queue middleware are: ActiveMQ, RabbitMQ, RocketMQ, Kafka. Here’s a quick comparison of the pros and cons of different message queues:

  • ActiveMQ: Single machine throughput: ten thousand levels; Timeliness: milliseconds; Availability: high; Message reliability: low probability of data loss; Feature support: extremely complete
  • RabbitMQ: Single-machine throughput: 10,000 levels; Timeliness: microseconds; Availability: high; Message reliability: basically not lost; Function support: Erlang development
  • RocketMQ: Single-machine throughput: 100,000 levels; Timeliness: milliseconds; Availability: Very high; Message reliability: 0 can be configured to lose; Function Support: Distributed
  • Kafka: Single-machine throughput: 100,000 levels; Timeliness: milliseconds; Availability: Very high; Message reliability: 0 can be configured to lose; Function support: distributed, generally with big data system to carry out real-time data calculation, log collection and other scenarios, the industry’s de facto standard

Based on the above comparison, ActiveMQ is now used less and less, and the community is not very active. RabbitMQ is open source, stable and has an active community. However, it is not very manageable for companies to develop RabbitMQ using Erlang. RabbitMQ is developed in Java, with a relatively active community and a manageable level of control for companies who can modify its source code. Kafka is generally used for real-time computing and log collection in big data.

How can message queues be highly available?

What is high availability?

High availability (HA), an IT term referring to the ability of a system to perform its functions without interruption, represents the degree to which a system is available.

How can message queues be highly available?

RocketMQ high availability

RocketMQ processes are commonly referred to as brokers. Brokers are usually clustered and each Broker needs to be registered with NameServer. From this we can see that there are two areas of high availability that need to be addressed: Broker failure and NameServer failure. For NameServer, its high availability guarantee is clustered deployment, each NameServer does not communicate with each other, and each NameServer has a complete copy of Broker routing information. The failure of a NameServer has no impact on the cluster. As long as one NameServer is still alive, complete services can be provided. For the Broker, its high availability guarantee is a master-slave architecture and multi-copy strategy. The Master Broker has two roles: Master and Slave. The data on the Master and the Salve are identical. The Master Broker synchronizes messages to the Slave Broker when they are received. Each Master and Slave Broker registers with all Nameservers. If the Master is down, a Slave is elected as the Master to continue to provide the write service. The read service is not affected. Each Slave can read but cannot write. The Slave is down, but services are not affected.

The RabbitMQ high availability

RabbitMQ is not distributed and has three modes: single machine, normal cluster and mirrored cluster. Only the mirroring cluster mode ensures high availability, and the production environment does not use single-machine mode.

  • Common cluster mode: All nodes in a cluster store queue metadata, but only one node stores queue content. If this node goes down, data will be lost

  • Mirrored cluster mode: Each node in a cluster stores complete queue metadata and queue content, which is a complete mirror data. If a node is down, overall services are not affected. The disadvantage is that there is no distributed support, each machine has a complete copy of the data.

Kafka high availability

Kafka high availability is similar to RocketMQ in that it is implemented with a master/slave multiple backup architecture. Each leader broker can have multiple follower brokers. Both producer production and consumer consumption operate the leader broker node. When the leader broker breaks down, the follower is automatically switched to the leader.

How to ensure that messages are not re-consumed?

Why the problem of repeated consumption?

RabbitMQ, RocketMQ, and Kafka can all have double consumption problems, which can be caused by producers, MQ, or consumers. The double consumption problem here refers to the fact that the same data is executed twice, not just that a message is consumed twice in MQ, but that there may be two identical consumption in MQ.

  • Producer: A producer might repeatedly push a piece of data into MQ, why would this happen? Maybe a Controller interface is called twice without idempotency of the interface; It could also be that the message is pushed to MQ slowly and the producer’s retry mechanism causes the message to be pushed again.

  • MQ: After the consumer finishes consuming a piece of data and responds to the ACK signal for successful consumption, MQ suddenly hangs up. As a result, MQ assumes that the consumer has not consumed the data, and then pushes the message again after recovery, resulting in repeated consumption.

  • Consumer: The consumer has consumed a message and is about to send an ACK signal to MQ when the consumer hangs up. After the service restarts, MQ pushes the message again, assuming that the consumer has not consumed the message.

How can message queues be idempotent?

The problem of repeated consumption of messages is actually related to the idempotence of messages consumed by message providers. The problem of duplicate consumption is usually solved on the consumer side, and of course the producer side is better off simply not producing duplicate data, but MQ generally allows multiple copies of the same data, but the consumer side is not allowed to consume two copies of the same data, so idempotency protection is usually implemented on the consumer side.

So how can consumers solve the problem of repeated consumption? There are two ways to solve this problem

  • Status judgment: consumers record the consumption data in Redis after consuming data, and check whether the message exists in Redis before next consumption. If the message exists, it indicates that the message has been consumed and the message is discarded directly.

  • Business judgment: Data must be inserted into the database after consumption. The uniqueness constraint of the database is used to prevent repeated consumption. Each consumption attempts to insert data directly, and if a unique field is repeated, the message is lost directly. In general, this approach to business judgment is a simple and efficient way to avoid repeated processing of messages.

conclusion

The above describes why the problem of double consumption can occur, and producers, MQ, and consumers can all contribute to double consumption of messages. The problem of double consumption is usually solved on the consumer side, and by default it is possible to have two identical pieces of data in MQ and the consumer has to do idempotent processing. The simplest and most efficient idempotent processing is to judge the table according to the unique field, such as the order number.

How do I handle message loss?

Why are messages lost?

Similar to message duplication, message loss can occur among producers, MQ, and consumers. What causes message loss among all three?

  • Producer: A producer pushes a message to MQ. The message is not pushed to MQ due to network jitter, or the message is pushed to MQ but an error occurs in MQ and the message is lost.

  • MQ: After receiving the message, MQ stores the message in the OS Cache temporarily. Before consumers consume the message, MQ hangs up and the message is lost.

  • Consumer: The consumer consumes this message, but before it can process it, the consumer hangs up, but the consumer has already told MQ that it has finished consuming, causing the message to be lost.

How to solve the problem of message loss

Different message queues address message loss differently, and here’s how different MQ addresses message loss.

RabbitMQ

The producer causes message loss

RabbitMQ can avoid message loss in two ways: transaction and Confirm mode. Let’s take a look at the transaction mechanism.

Channel. txSelect is used to start a transaction, channel.txCommit is used to commit the transaction, and channel.txRollback is used to roll the transaction back and forth. In order to avoid message loss, we can start the execution of txSelect method to start a transaction before sending the message, and then send the message. If the message delivery fails, execute txRollback to roll back the transaction, and then retry the transaction. If the message delivery succeeds, execute txCommit method to commit the transaction.

This scheme guarantees that our messages will be delivered successfully, but hardly anyone uses it. This scenario is synchronous blocking, meaning that a message is sent and the commit or rollback transaction cannot proceed until MQ responds. The general process is shown in the figure below:Another way RabbitMQ provides to avoid producer message loss is in Confirm mode. In Confirm mode, the producer does not need to wait for a reply from MQ. After receiving a message, MQ calls back the ACK interface of the producer to notify the producer that the message was successfully delivered. If the MQ interface fails to receive a message, the NACK interface calls back to notify the producer that the message was successfully delivered. The producer can repost the message. The general process is shown in the figure below:

RabbitMQ causes message loss

RabbitMQ itself loses data due to persistence. Normally RabbitMQ receives the message and writes it to the OS Cache, returning a success message to the producer. If RabbitMQ hangs, the message is lost. The solution is to configure RabbitMQ to persist to disk and send ack signals to the producer using the producer Confirm mode.

Consumer causes message loss

RabbitMQ loses data on the consumer side due to RabbitMQ’s default automatic commit ACK. The solution is to turn off the automatic ack for RabbitMQ. If the message is not processed and the consumer hangs up, RabbitMQ will assume that the message was not processed successfully and will push the message to the consumer again.

Kafka

The producer causes message loss

In The case of Kafka, the producer rarely loses a message, because the producer sends a message and waits for the Kafka response to succeed. If the response fails, the producer automatically retries again and again.

Kafka lost the data

Kafka usually has one leader and two followers. If a producer message has just been written to the leader but has not yet been synchronized to the followers, the leader breaks down and a new leader is elected. The message is lost.

The solution is to configure the producer to be notified that the message has been received successfully only after other followers have synchronized the message. The configuration is as follows:

  • Set replication. Factor to topic: This value must be greater than 1, requiring at least 2 replicas per partition.

  • Set the min.insync.replicas parameter on the Kafka server: This value must be greater than 1, which requires the leader to perceive that at least one follower is still in contact with him and does not fall behind, so as to ensure that there is still one follower when the leader dies.

  • If acks=all is set on the producer end, each piece of data must be written to all replicas before it can be considered as written successfully.

Following the above configuration, it is guaranteed that data will not be lost when the Kafka Broker switches to the new leader if the leader fails.

Consumer causes message loss

Kafka consumers lose data similar to RabbitMQ. When a Kafka consumer receives a message, it automatically submits a offset to Kafka to tell Kafka that the message has been processed. Similarly to RabbitMQ, turn off the automatic commit of offset.

RocketMQ

RocketMQ causes data loss in a similar way to RabbitMQ and Kafka. The producer fails to deliver the message due to network jitter, or RocketMQ’s Master node fails, or the Master/standby switchover fails, etc. The consumer may submit an offset to RocketMQ indicating that the message has been processed due to asynchronous processing.

In RocketMQ, transaction messages guarantee zero message loss. RocketMQ’s transaction message flow looks like this:

In the transaction message flow above, based on these three business flows: Send half message -> Process other business -> COMMIT /rollback. Let’s discuss the following situations:

  • What if the producer fails to send half?

You can retry the request or record the message to a file or database. In this case, the request fails.

  • What if the producer succeeds in sending the half message, but fails to process any other business?

The producer sent a rollback request to rollback the message in RocketMQ. The request failed.

  • What if the producer sends the half message successfully, but RocketMQ does not respond for some reason, such as network timeout?

Since the half message has been successfully sent and RocketMQ already has the message, RocketMQ has a compensation mechanism that calls back to an interface you developed and asks you whether the message is commit or rollback.

  • What if the producer succeeds in sending half, but fails to request commit or rollback?

This problem, like the one above, is handled through RocketMQ’s compensation mechanism.

conclusion

The causes of message loss have been introduced from the producers, MQ itself, and consumers. Message loss is a common problem that must be solved. Generally, services that use message queues are important services and data loss cannot be accepted.

It then describes how different MQ solutions solve message loss problems. The message loss caused by the consumer is due to the fact that the data has not been processed successfully. It is easy to process the message by notifying MQ in advance that the message has been processed successfully and disabling automatic submission or asynchronous operation. Message loss caused by producers and MQ itself is more difficult to handle. RabbitMQ uses Confirm mode to avoid message loss. In Kafka, only after all followers are synchronized successfully does the response push message to the producer succeed. RocketMQ uses transaction messages to ensure zero message loss and provides compensation mechanisms for handling different exceptions.

How do I ensure that messages are sequential?

Why the disorder of order?

In production, there are often systems such as reporting systems that need to do MySQL binlog synchronization. For example, the order system needs to synchronize the data of the order table to the MySQL database of the big data department for statistical analysis of reports. The common practice is to monitor the binlogs of the order database based on the middleware like Canal, and then send these binlogs to MQ. The binlog is then obtained by the consumer from MQ and landed in the MySQL of the big data department.

In this process, an order may be added, deleted, or modified. For example, three binlogs are added, modified, and deleted. The consumer leng is changed the order to execute into delete, modify, increase, so can it go? Certainly not.

RabbitMQ messages are out of order

For RabbitMQ, this is usually caused by the fact that consumers are clustered and different consumers consume different messages for the same order. For example, consumer A executes an increase, consumer B executes A change, and consumer C executes A delete faster than consumer B. As consumer B is faster than consumer A, the order of consumption binlog execution to the database is out of order. Instead of adding, modifying, and deleting, the order is changed to deleting, modifying, and adding.

The following is a schematic of the possible out-of-order problems with RabbitMQ:

Kafka messages are out of order

For Kafka, messages from the same partition in a topic must be ordered. Producers can specify a key when writing to the same partition. We use the order number as the key, and the corresponding messages are sent to the same partition. So consumer consumption to the news must be orderly.

So why does Kafka still have this problem? The problem is consumers. Usually, when we consume multiple messages with the same key, we will use multi-threading technology to process them concurrently to improve the speed of message processing. Otherwise, it takes tens of ms to process a message, which means only dozens of messages can be processed in a second, and the throughput is too low. With multiple threads running concurrently, the binlogs may not be executed in the same order to the database.

The following diagram shows how Kafka can be out of order:

The RocketMQ message is out of order

For RocketMQ, each Topic can specify multiple MessageQueue, and when we write messages, the messages are evenly distributed to different MessageQueue, such as messages with the same order number, Add the binlog to MessageQueue1, modify the binlog to MessageQueue2, delete the binlog to MessageQueue3.

However, when consumers have multiple machines, a Consumer Group will be formed, and each machine in the Consumer Group will be responsible for consuming part of MessageQueue messages. So maybe consumer A consumes MessageQueue1 to perform the add operation, consumer B consumes MessageQueue2 to perform the modify operation, and consumer C consumes MessageQueue3 to perform the delete operation. However, when the consumption binlog is executed in the database, consumer A may not be the first to execute the deletion operation. It is possible that consumer C will execute the deletion operation first, because several consumers are executing in parallel, so the execution order between them cannot be guaranteed.

Here is a schematic of RocketMQ that may be out of order:

How do I ensure that messages are sequential?

Once you know why the ordering disorder occurs, you need to find a way to keep the messages sequential. As you can see from the previous examples, the ordering disorder is either due to multiple consumers consuming different messages from the same order number, or to messages from the same order number being distributed to different machines in MQ. Different message queues have different schemes to ensure message ordering.

RabbitMQ ensures that messages are sequential

The problem with RabbitMQ is that different messages are sent to the same queue and multiple consumers consume messages from the same queue. To solve this problem, we can create multiple queues for RabbitMQ, with each consumer consuming a fixed queue of messages and producers sending messages from the same order number to the same queue. Since messages from the same queue are always ordered, Messages with the same order number will be consumed by only one consumer in order, thus ensuring the orderliness of the messages. RabbitMQ ensures message ordering:

Kafka ensures that messages are sequential

Kafka from producers to consumers’ message this entire process were to ensure orderly, lead to the final order is due to the consumer need to use multithreading concurrent processing messages to improve throughput, such as consumer spending in the message, open the 32 threads processing messages, each thread thread processing messages speed is not consistent, That’s why the final message may be inconsistent.

Therefore, we only need to ensure that messages with the same order number are processed by the same thread. Therefore, we can add a memory queue before thread processing, each thread is only responsible for processing the message of one memory queue, and the message of the same order number is sent to the same memory queue. The following diagram shows how Kafka guarantees message ordering:

RocketMQ ensures that messages are sequential

RocketMQ messages are out of order because the binlogs of the same order number enter different MessageQueue, resulting in the binlogs of one order being processed by consumers on different machines.

To solve RocketMQ’s out-of-order problem, we just need to find a way to get the binlog of the same order into the same MessageQueue. Since messages in the same MessageQueue must be in order, a message in a MessageQueue can only be delivered to one Consumer for processing, so Consumer consumption must be in order. Here is RocketMQ’s scheme to ensure message ordering:

conclusion

The reasons of different message queues’ out-of-order problems are introduced above, and the solutions to ensure the order of messages in common message queues are given respectively. Message in the order of sex is in the MQ are a common problem worthy of attention, especially for the same order exist multiple messages of this kind of situation, different execution order may lead to different results, the order of disorder can lead to a lot of problems on the business, and these problems are often difficult to troubleshoot. However, not all messages need to be considered globally sequentially. Irrelevant messages, even if they are out of order, have no impact on the business and need to be looked at on a case-by-case basis.

How to deal with the backlog of millions of messages caused by a consumer failure?

Let’s start by thinking about what causes message queues and message million backlogs. First of all, there could be something wrong with the consumer side, such as outages or things like that, or the consumer side suddenly becomes extremely slow, which leads to a backlog of messages. It is also possible that the server that the consumer side relies on is down, for example, the NoSQL/MySQL that the consumer relies on is down, so that the consumer cannot function properly, resulting in a backlog of messages.

How to solve the million message backlog problem?

If the backlogged messages were allowed to be lost, it would be easy to modify the consumer code immediately and discard the messages, which would be very fast, so the backlogged messages would be processed very quickly.

However, many messages are not allowed to be discarded. So we still need to do it quickly. How do we do it quickly? The simplest and most efficient way is to temporarily deploy enough consumers to consume these messages together. Of course, before that, you need to restore the normal service of the system.

For example, for RocketMQ, there were only 4 MessageQueue per Topic, corresponding to 4 consumers. It is clear that if there are millions of messages in the backlog, then four message consumption will not quickly clear the backlog. We can modify 4 original consumer codes and send messages to a new RocketMQ instead of processing messages directly. This new RocketMQ has 20 MessageQueue per Topic. Then we can temporarily deploy 20 consumers to consume this batch of data together. Messages are consumed fivefold faster, and the backlog of millions of messages will soon be processed. After the backlog of messages is cleared, the temporary deployment of 20 consumers can be taken offline.

Refer to www.infoq.cn/profile/BF1…