teasing

The article is very long, click like again, form a good habit ๐Ÿ˜‹๐Ÿ˜‹๐Ÿ˜‹

This is really the last blog before the exam, and then the photo of failing.

Message queue literacy

A message queue is a queue that stores messages. I won’t explain the queue. Don’t tell me you don’t even know what a queue looks like?

So the question is not what is a message queue, but why does a message queue exist? What can message queues be used for? What good is it to do these things with it? Do message queues cause side effects?

Why does a message queue appear?

Message queue is a necessary skill for backend programmers, because distributed applications must involve communication between various systems, this time message queue also emerged. It can be said that distributed generation is the basis of message queue, and distributed is a very old concept, so message queue is also a very old middleware.

What can message queues be used for?

asynchronous

You might argue that communication between applications isn’t just about message queues, so why have a message queue in the middle of communication? Can’t I communicate directly?

Good ๐Ÿ‘, you brought up another concept, synchronous communication. For example, Dubbo, which is widely used in the industry, is an RPC framework suitable for synchronous communication between various systems.

Let me give you an example ๐ŸŒฐ. For example, we have a ticket purchase system, and the requirement is that the user receives a text message after the purchase has been made.

We omit the intermediate network communication time consumption. If the ticketing system needs 150ms and the SMS system needs 200ms, then the time consumption of the whole process is 150ms + 200ms = 350ms.

Of course, at first glance there is nothing wrong. However, when you think about it carefully, you will feel that there is a problem. My users have already completed the purchase when purchasing the ticket system, but NOW I have to prolong the whole request through synchronous call, and the short message system is not necessary, it is only an auxiliary function to enhance the user experience. Now I feel a little top-heavy in the whole call process. Buying tickets is a less time-consuming process, and NOW I have to wait for the time-consuming operation of sending SMS to return the result because of synchronous call. What if I add another one to send emails?

Then the whole system call chain becomes longer, the total time becomes 550ms.

When we need to queue in the canteen when we are students, we and the canteen lady are a synchronous model.

We need to tell the dining room aunt: “sister, give me a chicken leg, and then add a hot and sour potato silk, help me pour some juice up, more dozen rice oh ๐Ÿ˜‹๐Ÿ˜‹๐Ÿ˜‹” yi ~~~ in order to eat more, really disgusting.

Then the old woman helped us to make rice and side dishes, we looked at the old woman’s trembling hands and falling potato can not help but swallow saliva.

Finally we took the meal from the old woman and went to look for a seat…

Recall that we we are after the information you need to send Boyle synchronous wait for aunt give me with good food, we just add the chicken and potatoes, above one thousand I add a tomato beef brisket, leek egg, so whether Boyle rice dishes process will be longer, we wait for the time will be longer.

That later, we work to earn money to go to the restaurant to eat, we told the waiter to a bowl of beef noodles with a poached egg (to convey a message), and then we can be at the table at ease to play the mobile phone (dry their other things), until our beef noodles on we can eat. We get one message out, and then we move on to other things. It doesn’t get any shorter, but we only need to send a message to look at other things. It’s an asynchronous concept.

So, in order to solve this problem, smart programmers also added a kind of middleware in the middle of the server – message queue. At this point we can modify the model.

This way, after we put the message into the message queue, we can go straight back (we tell the waiter what we want to eat and play with the phone), so the total time is only 150ms + 10ms = 160ms.

But you need to be aware that the whole process takes the same amount of time, just as you can tell the waiter what to order without affecting the speed of the noodles.

The decoupling

Going back to the original synchronous call, let’s write pseudocode to briefly summarize it.

So the second step, we add another send mail, we have to modify the code again, if we add another requirement: users need to add points after purchase, do we have to change the code again?

If that’s ok with you, do I have to stop emailing the service at the moment, or do I have to change the code and restart the app again?

If this is a hassle, let’s use a message queue to decouple it in the middle. Behind you is important to note that we send text messages, emails, some operations such as adding integral depends on the result above, it is abstracting the ticket processing results, such as order number, user accounts, etc., that is to say, we need a series of services are behind the same message for processing. In this case, can we do it by “broadcast message”?

The “broadcast” I mentioned above is not really broadcast, but the following system subscribes to specific topics as consumers. For example, our topic here could be called booking, we buy the system as a producer to produce the message to put in the message queue, and then the consumer subscribs to the topic, pulls the message from the message queue and consumes it. As you can see from the graph we just drew, on the producer side we only need to focus on producing messages to the specified topic, and on the consumer side we only need to focus on pulling messages from the specified topic.

Without message queues, we would have to call a new interface in the main system every time a new business came in, or we would have to delete some interface calls in the main system when we canceled some business. With message queues, we only need to care about whether the message is delivered to the queue. As for who wants to subscribe and how to deal with the message received next, it is a downstream matter, which undoubtedly greatly reduces the workload of development and tuning.

Peak clipping

Let’s go back to the situation where we started using the synchronous call system, and think about what would happen if there were a large number of users requesting tickets?

If, at this point, 10,000 requests enter the ticket purchasing system, we know that the server configuration running our main business is generally good, so we assume that the ticket purchasing system can withstand 10,000 user requests, which means that we will also have 10,000 requests to call the SMS service. The SMS system is not our main business, so the hardware resources we are equipped with are not too high, so do you think the current SMS system can withstand the peak of ten thousand, not to mention whether it can withstand, will the system directly crash?

SMS is not our main business, can we meet each other halfway? If we send the purchase to a message queue, and the SMS system does its best to fetch and consume messages from the message queue, it doesn’t matter if the processing is slower, as long as our system doesn’t crash.

You’re afraid you’re running out of firewood? Are you telling me that every time you send a captcha you get one?

What are the benefits of message queues?

Actually, I already said that. Asynchronous, decoupled, peak clipping. Remember these six words even if you don’t understand them, because they are not only the essence of message queuing, but also the essence of programming and architecture.

Do message queues cause side effects?

No technology is a silver bullet, and message queuing has its side effects.

For example, a call between two systems is fine, but I add a message queue in the middle. What if the message queue hangs? Does it reduce the availability of the system?

Is this to ensure HA? Is it going to be a cluster? Does my overall system complexity go up?

Regardless of the above issues, in case the sender fails to send and then performs a retry, it is possible to produce duplicate messages.

Or I fail the consumer processing and request retransmission, which also produces duplicate messages.

For some micro-services, consuming repeated messages can be more troublesome, such as adding points. Is it unfair for other users to add more than once?

So how do you solve the problem of repeated consumption of messages?

What if our messages need to be strictly sequential at this point? (such as producers in the production of a series of orderly news to delete a record with id 1 increase), but we know in publish-subscribe model, is no order for a theme, then this time will be cause for consumers’ message when no consumption in the producer send order, such as increasing consumption at this time we order for modification and deletion, Would it be a big deal if the record involved money?

So what about the sequential consumption of messages?

Take the distributed system mentioned above as an example. Do users need to increase account points after purchasing tickets? In the same system we would normally use transactions to resolve this, but with Spring we would have included the @Transactional annotation in the above pseudo-code. But how do you guarantee transactions across different systems? I can’t. I successfully deducted money from this system. Did you add points to your points system? Or I failed, and your points system gave me points.

So how do you solve the distributed transaction problem?

We just said that message queues can be peak-clipped, so if my consumer if my consumer is consuming slowly or my producer is producing messages quickly, is that going to pile up messages in the message queue?

So how do you solve the problem of message accumulation?

Decreasing availability, increasing complexity, and a series of repeated consumption, sequential consumption, distributed transactions, message stack problems, this message queue also how to use ๐Ÿ˜ต?

Don’t worry. There’s always a way.

What is RocketMQ?

Wow, you son of a bitch! After all the questions I’ve been asked, you’re talking about RocketMQ, or are you letting people live? ! ๐Ÿคฌ

Wait, wait, wait, wait, do you know the structure of MQ? I haven’t talked about it yet, let’s understand the internal structure of MQ first, then see how to solve the above series of problems, but you should read and understand the problem.

RocketMQ is a queue model of messaging middleware with high performance, high reliability, high real-time, distributed characteristics. It is a distributed messaging system developed in Java language, developed by Alibaba team, and contributed to Apache at the end of 2016, becoming a top-level project of Apache. Within Alibaba, RocketMQ serves thousands of large and small applications of the group well. On singles Day every year, an incredible trillions of messages flow through RocketMQ.

Without further ado, those of you who want to know the history of RocketMQ can do your own research. After listening to the above introduction, all you need to know is that RocketMQ is fast, awesome and experienced on Singles’ Day!

Queue model and topic model

Before we talk about RocketMQ’s technical architecture, let’s take a look at two noun concepts — the queue model and the topic model.

First of all, why is a message queue called a message queue?

You might think it’s stupid, but isn’t this just a queue for messages? Not called What is a message queue called?

Indeed, early message-oriented middleware was implemented through the queue model. Perhaps for historical reasons, we are all used to referring to message-oriented middleware as message queues.

However, today’s great messaging middleware, such as RocketMQ and Kafka, does more than just store messages through a queue.

Queuing models

Just as we understand queues, the queue model of messaging middleware is really just a queue… Let me draw a picture for you to understand.

At the beginning I mentioned the concept of “broadcast”, which means that if we need to send a message to multiple consumers (for example, I need to send messages to SMS and email systems), then a single queue will not suffice.

Of course, you can have the Producer produce messages into multiple queues, and then each queue corresponds to each consumer. The problem is solvable; creating multiple queues and replicating multiple messages can have a resource and performance impact. Furthermore, this would cause the producer to need to know the specific number of consumers and then copy the corresponding number of message queues, which would violate the decoupling principle of our message middleware.

Topic model

So is there a good way to solve this problem? Yes, that’s the topic model or publish-subscribe model.

If you are interested, you can learn about the Observer mode in design Mode and implement it manually. I believe you will learn something.

In the Topic model, the producer of the message is called a Publisher, the consumer of the message is called a Subscriber, and the container that holds the message is called a Topic.

Where publishers send messages to specific topics, subscribers need to subscribe to a topic in advance to receive messages on a particular topic.

Message model in RocketMQ

The message model in RockerMQ is implemented according to the topic model. You may be wondering how this theme is actually implemented. You didn’t mention that either!

The underlying design of messaging middleware is different for each implementation of the topic model, such as partitions in Kafka, queues in RocketMQ, and exchanges in RabbitMQ. We can think of the topic/publish/subscribe model as a standard that middleware simply implements.

So, how exactly is the theme model implemented in RocketMQ? So let me draw a picture, just to try to make sense of it.

We can see that there are Producer Group, Topic and Consumer Group in the whole picture. Let me introduce them respectively.

  • Producer GroupProducer group: represents a class of producers. For example, we have multiple seckill systems as producers, which together are oneProducer GroupGroups of producers, which generally produce the same message.
  • Consumer GroupConsumer group: represents a certain type of consumer, for example, we have multiple SMS systems as consumers, which together are oneConsumer GroupConsumer groups, which generally consume the same messages.
  • TopicTopic: represents a type of message, such as order message, logistics message, and so on.

You can see that the producers in the producer group send messages to a topic, and there are multiple queues in the topic. Each time the producers produce a message, they specify a queue in the topic to send the message.

There are multiple queues in each topic (there is no Broker involved here). In the clustered consumption mode, a consumer cluster with multiple machines consumes multiple queues of a topic, and each queue is consumed by only one consumer. If a consumer dies, other consumers in the group take over from the consumer who dies. Consumer1 and Consumer2 correspond to two queues, but Consuer3 has no queues. Therefore, the number of consumers in the consumer group should be the same as the number of queues in the topic.

It is also possible to have fewer consumers than queues, although it is not recommended. The diagram below.

Each consumption group maintains one consumption location on each queue. Why?

Since we just drew a single consumer group, we know that in a publish-subscribe model there are usually multiple consumer groups involved, and each consumer group has a different consumption position in each queue. If there are multiple consumers group, then after a message is a consumer groups consumption will not delete (because other consumer groups also need ah), it is only for each customer group maintenance a consumer displacement (offset), finish each consumer groups consumption returns a successful response, and then queue to maintain the consumption of the displacement, This way, the message that was just consumed will not be consumed again.

Maybe you have another question, why do you need to maintain multiple queues in a topic?

The answer is to improve concurrency. Indeed, it is possible to have only one queue per topic. Imagine a publish-subscribe model if there was only one queue per topic and that queue also maintained the consumption location of each consumer group. The diagram below.

But does this mean that the producer can only send messages to one queue? Because of the need to maintain the consumption location, a queue can only correspond to the consumers in one Consumer group, so is there no use for other consumers? From both points of view, concurrency suddenly becomes much less.

So in summary, RocketMQ implements the Topic/publish/subscribe pattern by configuring multiple queues in a Topic and each queue maintains the consumption location for each consumer group.

Architecture diagram of RocketMQ

Having covered the message model, it is much easier to understand the technical architecture of RocketMQ.

The RocketMQ technology architecture has four main roles: NameServer, Broker, Producer, and Consumer. Let me explain to you what these four characters do.

  • Broker: Stores, delivers, and queries messages and ensures high availability of services. A producer produces messages to a Broker, and consumers pull messages from the Broker and consume them.

    Here, I also need to talk about the relationship between brokers, topics, and queues. I explained the relationship between Topic and queue – there are more than one queue in a Topic, so where does this Topic and queue reside?

    A Topic is distributed over multiple brokers, and a Broker can configure multiple topics in a many-to-many relationship.

    If a Topic has a high volume of messages, configure several queues for it (mentioned above to improve concurrency) and distribute them across as many brokers as possible to reduce the stress on one Broker.

    When Topic messages are evenly distributed, the more queues there are on a broker, the more stressed the broker is.

So we need to configure multiple brokers.

  • NameServer: I don’t know if you’ve seen Eureka in ZooKeeper and Spring Cloud, but it’s actually a registry that provides two main functions: Broker management and routing information management. The Broker registers its information with the NameServer, and the NameServer stores a lot of the Broker’s information. Consumers and producers fetch routing tables from NameServer and communicate with the corresponding brokers based on the routing table information (producers and consumers regularly query NameServer for information about the relevant brokers).

  • Producer: a role that publishes messages and supports distributed cluster deployment. Producers, to put it bluntly.

  • Consumer: message consuming role, which supports distributed cluster deployment. Messages can be consumed in push and pull modes. It also supports clustering and broadcast consumption and provides real-time message subscription mechanism. In plain English, consumers.

After listening to the above explanation, you might think, this thing is very simple. Isn’t that what it is?

Huh? One question you might ask is, what does this old NameServer do? Shouldn’t producers, consumers, and brokers directly produce messages and consume them?

However, as we mentioned above, brokers need to be highly available. If the whole system depends on only one Broker, will the pressure on that Broker be too great? Therefore, we need to use multiple brokers to ensure load balancing.

If our consumers and producers are directly connected to multiple brokers, each producer and consumer must be implicated when the Broker changes, creating coupling problems that the NameServer registry is designed to solve.

If that doesn’t make sense, check out my article on Spring Cloud, which describes the Eureka registry.

Of course, the technical architecture in RocketMQ is definitely more than that, because the four roles in the diagram above all need to be clustered. Let me give you a diagram of the official website, so you can try to understand it.

In fact, it is no different from the beggar’s version of the architecture we drew at the beginning, mainly in some details. Listen to me carefully to ๐Ÿคจ.

First, our brokers are clustered and master-slave deployed. Since messages are distributed across brokers, if a Broker goes down, messages read and write on that Broker will be affected. So Rocketmq provides a master/slave structure, where Salve periodically synchronizes data from the master (synchronous or asynchronous), and if the master goes down, the slave provides consumption services, but cannot write messages (as I’ll mention later).

Second, to ensure HA, our NameServer is also clustered, but note that it is decentralized. That means it doesn’t have a primary node. You can obviously see that all nodes of NameServer are not Info replicable. In RocketMQ, it’s a single Broker that keeps long connections with all nameservers. And every 30 seconds the Broker sends a heartbeat to all nameservers, which contains its own Topic configuration. This step corresponds to the Routing Info above.

Third, when a producer needs to send a message to the Broker, it needs to obtain routing information about the Broker from NameServer, and then generate data to each queue through polling to achieve load balancing.

Fourth, after obtaining routing information for all brokers through NameServer, the consumer sends a Pull request to the Broker to retrieve message data. Consumer can start in two modes — Broadcast and Cluster. In broadcast mode, a message is sent to all consumers in the same consumer group. In cluster mode, a message is sent to only one consumer.

How to solve sequential consumption, repeated consumption

In fact, these are all things THAT I mentioned when I talked about some of the side effects of message queues, that is, these issues are not just tied to RocketMQ, but should be addressed by every messaging middleware.

In the technical architecture of RocketMQ, I have shown you how it is highly available. There is no operational setup involved here. If you are interested, go to the RocketMQ website and build your own RocketMQ cluster.

Kafka’s architecture is basically similar to RocketMQ, except that its registry uses Zookeeper and its partitions are similar to the queues in RocketMQ. There are some minor differences that will be mentioned later.

Order consumption

In the technical architecture introduction above, we have seen that RocketMQ is topically unordered, and that it is only guaranteed to be ordered at the queue level.

This brings up two more concepts — ordinary order and strict order.

The so-called ordinary order means that the messages received by consumers through the same consumption queue are in order, while messages received by different message queues may be out of order. Normal sequential messages are not guaranteed to be sequential (for a short time) in the event of a Broker restart.

Strict order means that all messages received by consumers are in order. Strictly ordered messages guarantee the ordering of messages even in exceptional cases.

But as good as strict order looks, it can come at a huge cost. If you use a strictly sequential pattern, if one machine in a Broker cluster becomes unavailable, the entire cluster becomes unavailable. What else do you use? Now the main scenario is binlog synchronization.

In general, our MQ is tolerant of transient out-of-order, so the plain ordering mode is recommended.

So, we’re using the normal sequential mode, and we learned from the above that when a Producer produces messages, it polls (depending on your load balancing strategy) to send messages to different message queues on the same topic. So if I have several messages that are the creation, payment, and delivery of the same order, these three messages will be sent to different queues under the polling strategy, because there is no way to use the queue ordering features of RocketMQ to ensure that the messages are ordered in different queues.

So, what’s the solution?

In fact, we just need to deal with the same semantics of the message into the same queue (for example, the same order here), we can use Hash to ensure that the same order in the same queue.

Repeat purchases

Emmm, just two words — idempotent. The characteristic of an idempotent operation in programming is that any number of executions have the same effect as a single execution. For example, we have a system for processing order credits that adds a value to the credits of the user who created the order every time a message comes in. At one point, however, the message queue sent FrancisQ’s order message to the ordering system with the request to add 500 to FrancisQ’s score. However, when the scoring system received FrancisQ’s order message and returned a successful message to the message queue, there was a network fluctuation (and of course there were many other cases, such as an unexpected Broker restart, etc.), and the response was not sent successfully.

So, does the message queue try to resend the message if it doesn’t receive a response from the points system? Problem is, if I send this message again, what if it adds another 500 points to FrancisQ’s account?

So we need to give our consumers idempotence, that is, the result of processing the same message, no matter how many times it is executed.

So how do you idempotent your business? This still needs to be combined with specific business. You can do this by writing Redis, because Redis keys and values are inherently idempotent. Of course, there is also the use of database insert method, based on the unique key of the database to ensure that duplicate data cannot be inserted multiple times.

However, the main thing is to use a specific solution for your particular scenario. You need to know whether your message consumption is completely non-repeatable or can tolerate repeated consumption, and then choose strong and weak verification methods. After all, there are still few technical silver bullets in CS.

In the whole Internet domain, idempotent is not only applicable to the problem of repeated consumption of message queues, but these methods of realizing idempotent can also be used to solve the problem of repeated requests or repeated calls in other scenarios. For example, HTTP services can be idempotent to solve the problem of repeated submission of form data by the front-end or APP, or a microservice can be idempotent to solve the problem of repeated calls caused by automatic retries of RPC framework.

Distributed transaction

How do you explain distributed transactions? Everyone knows the business, right? It’s all or nothing. We can easily implement transactions in the same system, but in a distributed architecture, we have many services that are deployed between different systems and need to make calls between different services. For example, if I place an order and increase points, if distributed transactions cannot be guaranteed, system A places an order, but system B fails to increase points, or system A does not place an order, but system B increases points. The former is unfriendly to users and the latter is bad for carriers, which none of us want to see.

So, how to solve this problem?

Some of the more common distributed transaction implementations today are 2PC, TCC, and transaction messaging (half half messaging mechanism). Each implementation has its own specific usage scenarios, but it also has its own problems, none of which is a perfect solution.

RocketMQ uses a transaction message plus a transaction backcheck mechanism to solve the distributed transaction problem. I’ve drawn a picture, so you can kind of visualize it.

The half message sent in the first step means that the message is not visible to the consumer until the transaction commits.

So, how do you write messages but not be visible to the user? RocketMQ transaction messages do this: if the message is half, back up the topic of the original message and the message consumption queue, and then change the topic to RMQ_SYS_TRANS_HALF_TOPIC. Since the consumer group is not subscribed to the Topic, the consumer cannot consume messages of type half. RocketMQ then starts a scheduled task to pull messages from Topic RMQ_SYS_TRANS_HALF_TOPIC for consumption, obtain a service provider based on the producer group and send back a transaction status request. The message is committed or rolled back based on the transaction state.

You can imagine if there is no transaction backcheck mechanism from step 5, if there is a network fluctuation and step 4 does not send successfully, then MQ does not know whether to send to the consumer, it is like a headless chicken. RocketMQ uses the above transaction backlookup to resolve this, whereas Kafka usually throws an exception and lets the user resolve it themselves.

You also need to note that the operations that point to system B in MQ Server are no longer related to system A, which means that distributed transactions in message queues are the same transactions that store messages to message queues. This results in the ultimate consistency of transactions, since the process is asynchronous and each system only has to guarantee its own portion of the transaction.

Message heap problem

Above we mentioned one important feature of message queuing – peak clipping. So what if the spike is so large that messages pile up in the queue?

The problem can be generalized, because there are really only two sources of message accumulation — producers producing too fast or consumers consuming too slowly.

We can solve this problem from many angles. When the flow reaches the peak because the producer is producing too fast, we can use some methods of limiting the flow and downgrading. Of course, you can also add multiple consumer instances to expand horizontally and increase the consumption capacity to match the production surge. If the consumer is too slow to consume, we can first check whether the consumer has made a large number of consumption errors, or print a log to see if a thread is stuck, locked resources are not released, etc.

Of course, the fastest way to solve the message heap problem is to increase the number of consumer instances, but you also need to increase the number of queues per topic.

Remember that in RocketMQ, a queue will only be consumed by one consumer, and if you just add consumer instances you will have the same situation as WHEN I first sketched your easel.

Back in the consumption

Backtracking consumption is when a Consumer has successfully consumed a message, but due to business requirements it needs to be re-consumed. In RocketMQ, the message needs to be retained after the Broker has delivered a successful message to the Consumer. Reconsumption is usually done in time. For example, if the Consumer system fails and needs to be reconsumed one hour ago, the Broker should provide a mechanism to reverse the consumption progress in time. RocketMQ supports backtracking of consumption in time, down to the millisecond.

This is the explanation of the official document, I directly copied it as a popular science ๐Ÿ˜๐Ÿ˜๐Ÿ˜.

RocketMQ’s swipe mechanism

If you’re wondering what I’ve told you about the architecture and design of RocketMQ

What is the form of queues in a Topic?

How are messages stored in queues persisted?

What about synchronous and asynchronous brushes I mentioned above? What impact do they have on persistence?

I’m going to explain it to you.

Synchronous and asynchronous disk flushing

As shown in the figure above, the synchronous flush requires waiting for a successful ACK. Synchronous flush is a good guarantee for MQ message reliability, but has a significant impact on performance. It is generally suitable for specific business scenarios such as finance.

The asynchronous flush usually starts a thread to perform the flush operation asynchronously. Message flushing adopts the background asynchronous thread submission mode, which reduces read/write latency and improves MQ performance and throughput. It is generally applicable to service scenarios that do not have high requirements on message guarantee, such as sending verification codes.

In general, asynchronous flushes only lose data if the Broker fails unexpectedly. You can set the Broker parameter FlushDiskType to adjust your flush policy (ASYNC_FLUSH or SYNC_FLUSH).

Synchronous replication and asynchronous replication

The synchronous and asynchronous flush mentioned above are at the level of a single node, while synchronous and asynchronous replication mainly refer to whether the slave node needs to be synchronized when the master node returns messages to the client in Borker master-slave mode.

  • Synchronous replication: Also known as “synchronous double write”, that is, the write success is returned only when the message is simultaneously written to the primary and secondary nodes.
  • Asynchronous replication: Write success is returned immediately after the message is written to the master node.

However, there is no perfect scheme for many things. For example, the more nodes we write messages to, the better the reliability of the message will be, but the performance will also decline. Therefore, programmers need to choose the master/slave replication scheme suitable for specific business scenarios.

Does asynchronous replication affect message reliability in the same way that asynchronous flushing does?

The answer is no, because they are different concepts, and the reliability of messages is guaranteed by different flushing strategies, while strategies like asynchronous synchronous replication only affect availability. Why is that? The main reason for this is that RocketMQ does not support automatic master-slave switching, and producers can no longer produce messages to the master node when the master node fails.

For example, asynchronous replication is adopted at this time. When the master node has not finished sending messages that need to be synchronized, the master node hangs up. At this time, the slave node loses some messages. But producers can’t to the master node to produce the news, consumers can automatically switch to from nodes for consumption (consumption), so the primary node hang up time will only create master-slave node is inconsistent with the short message, reduces the availability, and when the master node after the restart, copy before the part from the node’s message will continue to copy.

In a single-master-slave architecture, if a master node fails, it means that the entire system can no longer be produced. So can this usability problem be solved? Remember that in our original architecture diagram, each Topic was distributed among different brokers.

But the problem with this kind of copying is that there is no guarantee of strict order. In the previous article we mentioned how to ensure message ordering by sending a semantic message to the same queue, using queues under Topic. If our primary node A is responsible for A series of semantic messages for order A at this point, and then it hangs, no other node can replace primary node A, and if any of our nodes can store any message, there is no order at all.

Dledger is used in RocketMQ to solve this problem. He requires that when writing a message, it must be copied to at least half of the nodes before the client returns a write success, and it supports dynamic switching of primary nodes by election. I will not expand the explanation here, the reader can find out for himself.

Doesn’t mean Dledger is a perfect solution, at least in Dledger election process is unable to provide services, and he must use three or more nodes, hang if the majority of the nodes at the same time he is also unable to guarantee the availability of, and efficiency and direct request message copy of blackboard writing above node asynchronous replication still has certain gap.

The storage mechanism

Remember our first three questions above? So the third problem has been solved.

But what form does a queue take in a Topic? How are messages stored in queues persisted? It’s not solved yet, but this is actually how RocketMQ designs its storage structure. I’d like to start by introducing the three main players in the RocketMQ message storage architecture — CommitLog, ConsumeQueue, and IndexFile.

  • CommitLog:The message body and the storage body of metadata, the storageProducerThe body content of the message written by the end. The message content is not fixed length. The default size of a file is 1 GB, the file name length is 20 bits, and the remaining offset is the start offset. For example, 00000000000000000000 indicates the first file. The start offset is 0, and the file size is 1 GB =1073741824. When the first file is full, the second file is 00000000001073741824, and the start offset is 1073741824, and so on. The message is thatWrite the log file sequentiallyWhen the file is full, write to the next file.
  • ConsumeQueue: message consumption queue,The main purpose of the introduction is to improve the performance of message consumption(As we talked about earlier), due toRocketMQIt’s theme-basedTopicIn the subscription model, message consumption is done by topic if traversal is requiredcommitlogAccording to the documentsTopicRetrieving messages is very inefficient.ConsumerCan be based onConsumeQueueTo find messages to consume. Among them,ConsumeQueue(Logical consumption queue)Serves as an index for consuming messages, save the specifiedTopicUnder the queue messages inCommitLogIn theStart physical offset offset.Message sizesizeAnd the messageTag ็š„ HashCodeValue.Consumequeue fileYou can view it as topic-basedCommitlog Index file, therefore,consumequeueThe folders are organized as follows: Topic /queue/file Three-tier organization structure, storage path: $HOME/store/consumequeue/{topic}/{queueId}/{fileName}. The sameconsumequeueThe file takes a fixed length design, each entry is 20 bytes, respectively 8 bytescommitlogPhysical offset, 4-byte message length, 8-byte taghashcodeA single file consists of 30W entries and can be accessed randomly like an array, eachConsumeQueueThe file size is about 5.72M;
  • IndexFile:IndexFileIndex files provide a way to query messages by key or time interval. Here only do popular science not detailed introduction.

In summary, the main message storage structures are CommitLoq and ConsumeQueue. And a ConsumeQueue you can kind of think of as a queue in a Topic.

RocketMQ uses a hybrid storage structure, where all queues within a single instance of the Broker share a log data file to store messages. Interestingly, Kafka, which also has high concurrency, allocates one storage file per Topic. It’s a bit like when you have a bunch of books that need to be put on a shelf. RockeMQ stacks them in batches, regardless of the type of book, whereas Kafka stacks the books in designated categories.

Why would RocketMQ do that? The reason is that the efficiency of data writing is improved. Regardless of Topic, it means that we have a greater chance to obtain batch messages for data writing, but it also brings a trouble that the whole large file needs to be traversed when reading messages, which is very time-consuming.

So, In RocketMQ, ConsumeQueue is used as the index file for each queue to improve message reading efficiency. We can directly calculate the global position of the index according to the message number of the queue (index number * index fixed length 20), and then directly read the index, and then find the message according to the global position of the message recorded in the index.

At this point, you may be a little confused about RockeMQ’s storage architecture, but let’s try to figure it out.

Emmm, is it a bit complicated ๐Ÿคฃ, look at English pictures and English documents do not be afraid, hard scalp to look down on the line.

If the above did not understand the reader must seriously look at the following process analysis!

First of all, the top piece is what I was talking about and you can now just think of ConsumerQueue as Queue.

On the far left side of the figure it is illustrated that the red squares represent messages being written and the dotted squares represent messages waiting to be written. The producer on the left sends a message that specifies the Topic, QueueId, and message content, while in the Broker, it stores the message sequence directly to the CommitLog, regardless of which message you are sending. The CommitLog offset, message size, and tag hash values are stored in the corresponding ConsumeQueue index file according to the Topic and QueueId specified by the producer. In each queue, the ConsumeOffset is stored, which is the consumption location of each consumer group. When a consumer pulls a message for consumption, it only needs to fetch the next unconsumed message according to the ConsumeOffset.

This is my general understanding of the overall message storage architecture (without going into the details of sparse indexing, etc.), and I hope it helps.

Because there is a knowledge point because write hi forgot to speak, think which Riga is not good, so I leave you to think about ๐Ÿค”๐Ÿค”.

Why are CommitLog files designed to be fixed length? Reminder: Memory mapping mechanisms.

conclusion

Finally finished writing this blog post. Do you remember what I said ๐Ÿ˜…?

In this article I mainly want to introduce to you

The cause of the message queue

Role of message queues (asynchronous, decoupled, peak clipping)

A host of problems with message queues (message stacking, repeated consumption, sequential consumption, distributed transactions, and so on)

Two message models for message queues – queue and topic patterns

The technical architecture of RocketMQ (NameServer, Broker, Producer, Comsumer) is analyzed.

Together with RocketMQ answers the message queue side effects solution

This section describes the storage mechanism and flush policy of RocketMQ.

Wait…

If you like, please like ๐Ÿ‘๐Ÿ‘๐Ÿ‘.