What can message queues be used for?

Message queue, as its name implies, is a queue for storing messages, which is mainly used to solve the communication problem between distributed systems. Used to solve asynchronous, decoupled, peak cutting.

asynchronous

You might argue that communication between applications isn’t just about message queues, so why have a message queue in the middle of communication? Can’t I communicate directly?

Good, you brought up another concept, synchronous communication. For example, Dubbo, which is widely used in the industry, is an RPC framework suitable for synchronous communication between various systems.

Let me give you an example. For example, we have an order system, and the requirement is that the user will receive a text message when the purchase is completed.

We omit the intermediate network communication time consumption. If the ticketing system needs 100ms and the SMS system needs 200ms, then the time consumption of the whole process is 100ms + 200ms = 300ms.

Of course, at first glance there is nothing wrong. However, when you think about it carefully, you will feel that there is a problem. The user has already completed the purchase when shopping in the order system, but NOW I have extended the whole request time through synchronous call, and the SMS system is not the main process, it is only an auxiliary function. Now the whole call process is a little top-heavy. Shopping is not a time-consuming process, but now because of synchronous call, it has to wait for the time-consuming operation of sending SMS to return the result and complete the whole shopping process. If there are mail system, wechat message synchronization notification and other systems?Then the process of the whole shopping system has been extended, which takes 900ms. What if we needed to call more auxiliary function systems?

To solve this problem, clever programmers also added a warehouse-like middleware in the middle – message queues. At this point we can modify the model.

In this way, only need to store the shopping information in the message queue, can directly return, end the whole shopping process, the time needed to reduce to 110ms. Subsequent call mail system, SMS system, wechat system… Other non-essential functions no longer affect shopping time.

The decoupling

Join says there is no message queue, the whole shopping process is called synchronously, with pseudo-code for a simple description.

public void purchaseTest(Request request) {
        / / check
        validate(request);
        / / shopping
        Result result=pourchase(request);
        // Send SMS messages
        sendMessage(result);
    }
Copy the code

If we add the mail system, wechat system, points system and logistics system later, do we have to change the code again? We will find that some operations such as SMS service, email service and wechat service depend on the Result of shopping, that is, the following series of services all need the same message to process. Since this is the case, you can do this through “broadcast messages.”

At this point, shopping is regarded as a topic, and the shopping system, as a producer, puts the shopping result into the message queue, and then the consumer subscribing to this topic — shopping, will pull the message from the message queue and consume. The producer only needs to focus on which topic to put the message in, and the consumer only needs to focus on which topic to pull the message for consumption. If there is no message queue, every time a new business is added or deleted, the business interface called needs to be adjusted in the shopping main system, which will make the whole system have strong coupling, which is not conducive to the development and debugging between departments, and is not conducive to the troubleshooting of problems.

Peak clipping

If we use the synchronous call system, think about how the shopping system will behave if a large number of users are shopping at the same time.If there are 100000 users at the same time request shopping system, ordering system as the main business, the server’s configuration will be relatively well, may be able to bear high concurrency, but the back of the SMS services, mail services, such as auxiliary business, server configuration is not so good, it is difficult to bear such a big concurrent, probably a system crash.

If you use the message queue, only need to store shopping information to the message queue, the back of the auxiliary business can configure according to their own discretion for flow control, such as maximum concurrency under 10000 SMS system, then SMS system can set the pull from the message queue messages for a maximum of 10000, although this can make the auxiliary business processing speed slow down, However, it can ensure the normal and smooth operation of the whole system, and does not affect the processing speed of main services.

Disadvantages of message queues

No technology is perfect, and message queuing has its drawbacks.

For example, a call between two systems is fine, but I add a message queue in the middle. What if the message queue hangs? Does it reduce the availability of the system?

Is this to ensure HA? Is it going to be a cluster? Does my overall system complexity go up?

Regardless of the above issues, in case the sender fails to send and then performs a retry, it is possible to produce duplicate messages.

Or I fail the consumer processing and request retransmission, which also produces duplicate messages.

For some micro-services, consuming repeated messages can be more troublesome, such as adding points. Is it unfair for other users to add more than once?

So how do you solve the problem of repeated consumption of messages?

What if our messages need to be strictly sequential at this point? For example, if the producer produces a series of ordered messages (delete, add and modify a record with ID 1), but we know that in the publish-subscribe model, there is no order for topics, then the message consumption for consumers is not in the order of the producer. For example, at this time, the order of consumption is to modify, delete and increase. If the record involves the amount, will there be a big problem?

So what about the sequential consumption of messages?

Take the distributed system we mentioned above for example, does the user need to add account points after finishing the shopping? In the same system we would normally use transactions to resolve this, but with Spring we would have included the @Transactional annotation in the above pseudo-code. But how do you guarantee transactions across different systems? I can’t. I successfully deducted money from this system. Did you add points to your points system? Or I failed, and your points system gave me points.

So how do you solve the distributed transaction problem?

We just said that message queues can be peak-clipped, so if my consumer if my consumer is consuming slowly or my producer is producing messages quickly, is that going to pile up messages in the message queue?

So how do you solve the problem of message accumulation?

Decreased availability, increased complexity, and a series of repeated consumption, sequential consumption, distributed transactions, message stack problems, how to use message queues?

Don’t worry. There’s always a way.

What is RocketMQ?

RocketMQ is a queue model of messaging middleware with high performance, high reliability, high real-time, distributed characteristics. It is a distributed messaging system developed in Java language, developed by Alibaba team, and contributed to Apache at the end of 2016, becoming a top-level project of Apache. Within Alibaba, RocketMQ serves thousands of large and small applications of the group well. On singles Day every year, an incredible trillions of messages flow through RocketMQ.

Without further ado, those of you who want to know the history of RocketMQ can do your own research. After listening to the above introduction, all you need to know is that RocketMQ is fast, awesome and experienced on Singles’ Day!

Queue model and topic model

Before we talk about RocketMQ’s technical architecture, let’s take a look at two noun concepts — the queue model and the topic model.

First of all, why is a message queue called a message queue?

You might think it’s stupid, but isn’t this just a queue for messages? Not called What is a message queue called?

Indeed, early message-oriented middleware was implemented through the queue model. Perhaps for historical reasons, we are all used to referring to message-oriented middleware as message queues.

However, today’s great messaging middleware, such as RocketMQ and Kafka, does more than just store messages through a queue.

Queuing models

Just as we understand queues, the queue model of messaging middleware is really just a queue. Let me draw a picture for you to understand:In the beginning, I mentioned the concept of “broadcast”, that is, if we need to send a message to multiple consumers (for example, I need to send information to the SMS system and mail system), then a single queue will not be sufficient.

Of course, you can have the Producer produce messages into multiple queues, and then each queue corresponds to each consumer. The problem is solvable; creating multiple queues and replicating multiple messages can have a resource and performance impact. Furthermore, this would cause the producer to need to know the specific number of consumers and then copy the corresponding number of message queues, which would violate the decoupling principle of our message middleware.

Topic model

So is there a good way to solve this problem? Yes, that’s the topic model or publish-subscribe model.

In the Topic model, the producer of the message is called a Publisher, the consumer of the message is called a Subscriber, and the container that holds the message is called a Topic.

Where publishers send messages to specific topics, subscribers need to subscribe to a topic in advance to receive messages on a particular topic.

Message model in RocketMQ

The message model in RockerMQ is implemented according to the topic model. You may be wondering how this theme is actually implemented. You didn’t mention that either!

The underlying design of messaging middleware is different for each implementation of the topic model, such as partitions in Kafka, queues in RocketMQ, and exchanges in RabbitMQ. We can think of the topic/publish/subscribe model as a standard that middleware simply implements.

So, how exactly is the theme model implemented in RocketMQ? So let me draw a picture, just to try to make sense of it.We can see that there are Producer Group, Topic and Consumer Group in the whole picture. Let me introduce them respectively. Producer Group: Represents a class of producers. For example, we have multiple Producer systems that collectively produce the same message. Consumer Group: represents a Group of consumers. For example, we have several SMS systems as consumers, which together form a Consumer Group. They generally consume the same messages. Topic: Represents a type of message, such as order message, logistics message, etc.

You can see that producers in the producer group in the figure send messages to a topic where there are multiple queues, and producers send messages to a queue in a given topic after each production.

There are multiple queues in each topic (there is no Broker involved here). In the clustered consumption mode, a consumer cluster with multiple machines consumes multiple queues of a topic, and each queue is consumed by only one consumer. If a consumer dies, other consumers in the group take over from the consumer who dies. Consumer1 and Consumer2 correspond to two queues, but Consuer3 has no queues. Therefore, the number of consumers in the consumer group should be the same as the number of queues in the topic.

It is also possible to have fewer consumers than queues, although it is not recommended. The diagram below:Each consumption group maintains one consumption location on each queue. Why?

Since we just drew a single consumer group, we know that in a publish-subscribe model there are usually multiple consumer groups involved, and each consumer group has a different consumption position in each queue. If there are multiple consumers group, then after a message is a consumer groups consumption will not delete (because other consumer groups also need ah), it is only for each customer group maintenance a consumer displacement (offset), finish each consumer groups consumption returns a successful response, and then queue to maintain the consumption of the displacement, This way, the message that was just consumed will not be consumed again.Maybe you have another question, why do you need to maintain multiple queues in a topic?

The answer is to improve concurrency. Indeed, it is possible to have only one queue per topic. Imagine a publish-subscribe model if there was only one queue per topic and that queue also maintained the consumption location of each consumer group. The diagram below:But does this mean that the producer can only send messages to one queue? Because of the need to maintain the consumption location, a queue can only correspond to the consumers in one Consumer group, so is there no use for other consumers? From both points of view, concurrency suddenly becomes much less.

So in summary, RocketMQ implements the Topic/publish/subscribe pattern by configuring multiple queues in a Topic and each queue maintains the consumption location for each consumer group.

Architecture diagram of RocketMQ

Having covered the message model, it is much easier to understand the technical architecture of RocketMQ. The RocketMQ technology architecture has four main roles: NameServer, Broker, Producer, and Consumer.

Broker: Stores, delivers, and queries messages and ensures high availability of services. In plain English, message queue servers, producers produce messages to brokers, and consumers pull messages from brokers and consume them.

Here, a general introduction to the relationships between brokers, topics, and queues. There are multiple queues in a Topic. Where do these queues reside?

A Topic is distributed over multiple brokers, and a Broker can configure multiple topics in a many-to-many relationship.

If a Topic has a high volume of messages, configure several queues for it (mentioned above to improve concurrency) and distribute them across as many brokers as possible to reduce the stress on one Broker.

When Topic messages are evenly distributed, the more queues there are on a broker, the more stressed the broker is. NameServerIt is actually a registry, similar to ZooKeeper, which provides two main functions: Broker management and routing information management. The Broker registers its information with the NameServer, and the NameServer stores a lot of the Broker’s information. Consumers and producers fetch routing tables from NameServer and communicate with the corresponding brokers based on the routing table information (producers and consumers regularly query NameServer for information about the relevant brokers).

Producer: a role that publishes messages and supports distributed cluster deployment. Producers, to put it bluntly.

Consumer: message consuming role, which supports distributed cluster deployment. Messages can be consumed in push and pull modes. It also supports clustering and broadcast consumption and provides real-time message subscription mechanism. In plain English, consumers.

After listening to the above explanation, you might think, this thing is very simple. Isn’t that what it is?

At this point, you might be thinking, well, producers, consumers, and brokers can just produce messages and consume them. Why use NameServer?

However, as we mentioned above, brokers need to be highly available. If the whole system depends on only one Broker, will the pressure on that Broker be too great? Therefore, we need to use multiple brokers to ensure load balancing.

If our consumers and producers are directly connected to multiple brokers, each producer and consumer must be implicated when the Broker changes, creating coupling problems that the NameServer registry is designed to solve.

Of course, the technical architecture in RocketMQ is definitely more than that, because the four roles in the diagram above all need to be clustered. Let me give you a diagram of the official website, so you can try to understand it.First, brokers are clustered and deployed master-slave. Since messages are distributed across brokers, if a Broker goes down, messages read and write on that Broker will be affected. So RocketMQ provides a master/slave structure, where Salve periodically synchronizes data from the master (synchronous or asynchronous). If the master is down, the slave provides consumption services but cannot write messages.

Second, to ensure HA, our NameServer is also clustered, but note that it is decentralized. That means it doesn’t have a primary node. You can obviously see that all nodes of NameServer are not Info replicable. In RocketMQ, it’s a single Broker that keeps long connections with all nameservers. And every 30 seconds the Broker sends a heartbeat to all nameservers, which contains its own Topic configuration. This step corresponds to the Routing Info above.

Third, when a producer needs to send a message to the Broker, it needs to obtain routing information about the Broker from NameServer, and then generate data to each queue through polling to achieve load balancing.

Fourth, after obtaining routing information for all brokers through NameServer, the consumer sends a Pull request to the Broker to retrieve message data. Consumer can start in two modes — Broadcast and Cluster. In broadcast mode, a message is sent to all consumers in the same consumer group. In cluster mode, a message is sent to only one consumer.

How to solve sequential consumption, repeated consumption?

In fact, these are all things THAT I mentioned when I talked about some of the side effects of message queues, that is, these issues are not just tied to RocketMQ, but should be addressed by every messaging middleware.

In the technical architecture of RocketMQ, I have shown you how it is highly available. There is no operational setup involved here. If you are interested, go to the RocketMQ website and build your own RocketMQ cluster.

Kafka’s architecture is basically similar to RocketMQ, except that its registry uses ZooKeeper and its partitions are similar to the queues in RocketMQ. There are some minor differences that will be mentioned later.

Order consumption

In the technical architecture introduction above, we have seen that RocketMQ is topically unordered, and that it is only guaranteed to be ordered at the queue level.

This brings up two more concepts — ordinary order and strict order.

The so-called ordinary order means that the messages received by consumers through the same consumption queue are in order, while messages received by different message queues may be out of order. Normal sequential messages are not guaranteed to be sequential (for a short time) in the event of a Broker restart.

Strict order means that all messages received by consumers are in order. Strictly ordered messages guarantee the ordering of messages even in exceptional cases.

But as good as strict order looks, it can come at a huge cost. If you use a strictly sequential pattern, if one machine in a Broker cluster becomes unavailable, the entire cluster becomes unavailable. What else do you use? Now the main scenario is binlog synchronization.

In general, our MQ is tolerant of transient out-of-order, so the plain ordering mode is recommended.

So, we’re using the normal sequential mode, and we learned from the above that when a Producer produces messages, it polls (depending on your load balancing strategy) to send messages to different message queues on the same topic. So if I have several messages that are the creation, payment, and delivery of the same order, these three messages will be sent to different queues under the polling strategy, because there is no way to use the queue ordering features of RocketMQ to ensure that the messages are ordered in different queues.So, what’s the solution?

In fact, we just need to deal with the same semantics of the message into the same queue (for example, the same order here), we can use Hash to ensure that the same order in the same queue.

Repeat purchases

Just two words — idempotent. The characteristic of an idempotent operation in programming is that any number of executions have the same effect as a single execution. For example, we have a system for processing order credits that adds a value to the credits of the user who created the order every time a message comes in. At one point, however, the message queue sent FrancisQ’s order message to the ordering system with the request to add 500 to FrancisQ’s score. However, when the scoring system received FrancisQ’s order message and returned a successful message to the message queue, there was a network fluctuation (and of course there were many other cases, such as an unexpected Broker restart, etc.), and the response was not sent successfully.

So, does the message queue try to resend the message if it doesn’t receive a response from the points system? Problem is, if I send this message again, what if it adds another 500 points to FrancisQ’s account?

So we need to give our consumers idempotence, that is, the result of processing the same message, no matter how many times it is executed.

So how do you idempotent your business? This still needs to be combined with specific business. You can do this by writing Redis, because Redis keys and values are inherently idempotent. Of course, there is also the use of database insert method, based on the unique key of the database to ensure that duplicate data cannot be inserted multiple times.

However, the main thing is to use a specific solution for your particular scenario. You need to know whether your message consumption is completely non-repeatable or can tolerate repeated consumption, and then choose strong and weak verification methods. After all, there are still few technical silver bullets in CS.

In the whole Internet domain, idempotent is not only applicable to the problem of repeated consumption of message queues, but these methods of realizing idempotent can also be used to solve the problem of repeated requests or repeated calls in other scenarios. For example, HTTP services can be idempotent to solve the problem of repeated submission of form data by the front-end or APP, or a microservice can be idempotent to solve the problem of repeated calls caused by automatic retries of RPC framework.

Distributed transaction

How do you explain distributed transactions? Everyone knows the business, right? It’s all or nothing. We can easily implement transactions in the same system, but in a distributed architecture, we have many services that are deployed between different systems and need to make calls between different services. For example, if I place an order and increase points, if distributed transactions cannot be guaranteed, system A places an order, but system B fails to increase points, or system A does not place an order, but system B increases points. The former is unfriendly to users and the latter is bad for carriers, which none of us want to see.

So, how to solve this problem?

Some of the more common distributed transaction implementations today are 2PC, TCC, and transaction messaging (half half messaging mechanism). Each implementation has its own specific usage scenarios, but it also has its own problems, none of which is a perfect solution.

RocketMQ uses a transaction message plus a transaction backcheck mechanism to solve the distributed transaction problem. I’ve drawn a picture, so you can kind of visualize it.

The half message sent in the first step means that the message is not visible to the consumer until the transaction commits.

So, how do you write messages but not be visible to the user? RocketMQ transaction messages do this: if the message is half, back up the topic of the original message and the message consumption queue, and then change the topic to RMQ_SYS_TRANS_HALF_TOPIC. Since the consumer group is not subscribed to the Topic, the consumer cannot consume messages of type half. RocketMQ then starts a scheduled task to pull messages from Topic RMQ_SYS_TRANS_HALF_TOPIC for consumption. According to the producer group to get a service provider to send back to check the transaction state request, according to the transaction state to decide whether to commit or rollback the message.

You can imagine if there is no transaction backcheck mechanism from step 5, if there is a network fluctuation and step 4 does not send successfully, then MQ does not know whether to send to the consumer, it is like a headless chicken. RocketMQ uses the above transaction backlookup to resolve this, whereas Kafka usually throws an exception and lets the user resolve it themselves.

You also need to note that the operations that point to system B in MQ Server are no longer related to system A, which means that distributed transactions in message queues are the same transactions that store messages to message queues. This results in the ultimate consistency of transactions, since the process is asynchronous and each system only has to guarantee its own portion of the transaction.

Messages are stacked

Above we mentioned one important feature of message queuing – peak clipping. So what if the spike is so large that messages pile up in the queue?

The problem can be generalized, because there are really only two sources of message accumulation — producers producing too fast or consumers consuming too slowly.

We can solve this problem from many angles. When the flow reaches the peak because the producer is producing too fast, we can use some methods of limiting the flow and downgrading. Of course, you can also add multiple consumer instances to expand horizontally and increase the consumption capacity to match the production surge. If the consumer is too slow to consume, we can first check whether the consumer has made a large number of consumption errors, or print a log to see if a thread is stuck, locked resources are not released, etc.

Of course, the fastest way to solve the message heap problem is to increase the number of consumer instances, but you also need to increase the number of queues per topic.

Remember that in RocketMQ, a queue will only be consumed by one consumer, and if you just add consumer instances you will have the same situation as WHEN I first sketched your easel.

Back in the consumption

Backtracking consumption is when a Consumer has successfully consumed a message, but due to business requirements it needs to be re-consumed. In RocketMQ, the message needs to be retained after the Broker has delivered a successful message to the Consumer. Reconsumption is usually done in time. For example, if the Consumer system fails and needs to be reconsumed one hour ago, the Broker should provide a mechanism to reverse the consumption progress in time. RocketMQ supports backtracking of consumption in time, down to the millisecond.

RocketMQ’s swipe mechanism

Since I’ve talked so much about the architecture and design of RocketMQ, have you ever wondered what queues are in Topic?

How are messages stored in queues persisted?

What about synchronous and asynchronous brushes I mentioned above? What impact do they have on persistence?

I’m going to explain it to you.

Synchronous and asynchronous disk flushing

In synchronous flush, you need to wait for a successful ACK. Synchronous flush is a good guarantee for THE reliability of MQ messages, but has a significant impact on performance. It is generally applicable to specific business scenarios such as finance.

The asynchronous flush usually starts a thread to perform the flush operation asynchronously. Message flushing adopts the background asynchronous thread submission mode, which reduces read/write latency and improves MQ performance and throughput. It is generally applicable to service scenarios that do not have high requirements on message guarantee, such as sending verification codes.

In general, asynchronous flushes only lose data if the Broker fails unexpectedly. You can set the Broker parameter FlushDiskType to adjust your flush policy (ASYNC_FLUSH or SYNC_FLUSH).

Synchronous replication and asynchronous replication

The synchronous and asynchronous flush mentioned above are at the level of a single node, while synchronous and asynchronous replication mainly refer to whether the slave node needs to be synchronized when the master node returns messages to the client in Borker master-slave mode. Synchronous replication: Also known as “synchronous double write”, that is, the write success is returned only when the message is simultaneously written to the primary and secondary nodes. Asynchronous replication: Write success is returned immediately after the message is written to the master node.

However, there is no perfect scheme for many things. For example, the more nodes we write messages to, the better the reliability of the message will be, but the performance will also decline. Therefore, programmers need to choose the master/slave replication scheme suitable for specific business scenarios.

Does asynchronous replication affect message reliability in the same way that asynchronous flushing does?

The answer is no, because they are different concepts, and the reliability of messages is guaranteed by different flushing strategies, while strategies like asynchronous synchronous replication only affect availability. Why is that? The main reason for this is that RocketMQ does not support automatic master-slave switching, and producers can no longer produce messages to the master node when the master node fails.

For example, asynchronous replication is adopted at this time. When the master node has not finished sending messages that need to be synchronized, the master node hangs up. At this time, the slave node loses some messages. But producers can’t to the master node to produce the news, consumers can automatically switch to from nodes for consumption (consumption), so the primary node hang up time will only create master-slave node is inconsistent with the short message, reduces the availability, and when the master node after the restart, copy before the part from the node’s message will continue to copy.

In a single-master-slave architecture, if a master node fails, it means that the entire system can no longer be produced. So can this usability problem be solved? Remember that in our original architecture diagram, each Topic was distributed among different brokers.But the problem with this kind of copying is that there is no guarantee of strict order. In the previous article we mentioned how to ensure message ordering by sending a semantic message to the same queue, using queues under Topic. If our primary node A is responsible for A series of semantic messages for order A at this point, and then it hangs, no other node can replace primary node A, and if any of our nodes can store any message, there is no order at all.

Dledger is used in RocketMQ to solve this problem. He requires that when writing a message, it must be copied to at least half of the nodes before the client returns a write success, and it supports dynamic switching of primary nodes by election. I will not expand the explanation here, the reader can find out for himself.

Doesn’t mean Dledger is a perfect solution, at least in Dledger election process is unable to provide services, and he must use three or more nodes, hang if the majority of the nodes at the same time he is also unable to guarantee the availability of, and efficiency and direct request message copy of blackboard writing above node asynchronous replication still has certain gap.