directory
- RocketMQ introduction
- RocketMQ concept
- Why RocketMQ?
- Asynchronous decoupling
- Peak peel
- Distributed transactions are ultimately consistent
- Data distribution
- RocketMQ architecture
- RocketMQ message type
- Ordinary message
- The order message
- Timing of the message
- Transaction message
- Best practices
- Message retry
- The message filter
- Consumption patterns
- The power consumption etc.
- Local transaction message encapsulation
- Reference code
RocketMQ introduction
Apache RocketMQ is a distributed messaging middleware with low latency, high concurrency, high availability, and high reliability. Message queue RocketMQ provides asynchronous decoupling and peak-filling capabilities for distributed applications, as well as massive message accumulation, high throughput, and reliable retry for Internet applications.
RocketMQ concept
- Topic: message Topic, used to categorize a class of messages, such as an order Topic, to which all order-related messages can be carried and to which producers send messages.
- Producer: The role responsible for producing messages and sending them to a Topic.
- Consumer: The role responsible for receiving and consuming messages from a Topic.
- Messages: Content sent by producers to a Topic is consumed by consumers.
- Message attributes: Producers can customize business-specific attributes for messages, such as Message Key and Tag, when sending messages.
- Group: A class of producers or consumers that typically produce or consume the same type of message and that publish or subscribe logically.
Why use RocketMQ?
Asynchronous decoupling
With the popularity of microservice architectures, it is important to sort out the relationships between services. Asynchronous decoupling reduces the degree of coupling between services while improving the throughput of services.
There are a lot of business scenarios that use asynchronous decoupling, because each industry’s business is different, so I’m sure you’ll understand some of the more generic ones.
For example, in the ordering business scenario of e-commerce industry, the ordering process is as follows:
- Lock the inventory
- Create the order
- Users pay
- Deducting the inventory
- Send a purchase SMS notification to the user
- Add credits to the user
- Notify the merchant of delivery
After the following order is successful, the user will pay. After the payment is completed, there will be a logic called payment callback, in which some business logic needs to be done. First, take a look at the time required for synchronization, as shown below:
The above order process from 3 to 5 can be processed by an asynchronous process. For the user, after the payment is completed, he does not need to pay attention to the following process. Just take your time in the background, which simplifies the three steps and improves the callback processing time.
Peak peel
Peak clipping and valley filling refers to RocketMQ’s ability to withstand transient heavy traffic to protect system stability and improve user experience.
In the e-commerce industry, the most common traffic impact is the second kill activity, using RocketMQ to achieve a complete second kill business is still a lot of work, outside the scope of this article, I will have an opportunity to chat with you later. The point is that scenarios like these can be leveraged to withstand high concurrency using RocketMQ, as long as the business scenario supports asynchronous processing.
Distributed transactions are ultimately consistent
As we all know, distributed transactions have 2PC, TCC, final consistency and other schemes. It is common to use message queue to make the final consistency scheme.
In the business scenario of e-commerce, transaction-related core business must ensure data consistency. By introducing message queue RocketMQ version of distributed transactions, decoupling between systems can be achieved, and the ultimate data consistency can be guaranteed.
Data distribution
Data distribution refers to the requirement that raw data can be distributed to multiple systems that need to use the data to achieve heterogeneity. The most common is to distribute data to ES, and Redis provides search, caching and other services for business.
In addition to manual data distribution via messaging mechanisms, you can subscribe to Mysql’s binlog for distribution, in which RocketMQ sequential messages are used to ensure data consistency.
RocketMQ architecture
Photo source: Aliyun official document
- Name Server: Is a nearly stateless node that can be clustered to provide naming services, update and discover Broker services in RocketMQ version of the message queue. It’s a registry.
- Broker: A role that stores and forwards messages. There are Master brokers and Slave brokers. A Master Broker can correspond to multiple Slave brokers, but a Slave Broker can correspond to only one Master Broker. Once the Broker is started, it needs to register itself with the Name Server. Subsequently, the Topic routing information is regularly reported to the Name Server every 30 seconds.
- Producers: Keep-alive links are established with one of the nodes (randomly) in the Name Server cluster, Topic routing information is periodically read from the Name Server, and long links are established to the Master Broker providing Topic services. And sends heartbeat periodically to the Master Broker.
- Consumer: Establish a long connection (randomly) with one of the nodes in the Name Server cluster, periodically pull Topic routing information from the Name Server, and establish a long connection with the Master and Slave brokers providing Topic services. Periodically sends heartbeat messages to Master and Slave brokers. Consumers can subscribe to messages from both Master and Slave brokers, and the subscription rules are determined by the Broker configuration.
RocketMQ message type
RocketMQ supports a rich variety of message types to meet business requirements in multiple scenarios. Different messages have different application scenarios. The following describes the four commonly used message types.
Ordinary message
Normal messages are those that have no features in RocketMQ. When there are no specific business scenarios, plain messages are sufficient. If there are special scenarios, you can use special message types, such as sequence, transaction, and so on.
The synchronous
Synchronous sending: When a sender sends a message, the result returned by the server is synchronized.
Asynchronous send
Asynchronous sending: A message sender sends a message and can then send the next message without waiting for a result from the server. The sender can receive the server response through the callback interface and process the response results.
One way to send
Unidirectional sending: The sender only sends the message and does not care about it after the message is sent. In this way, the message sending speed is very fast and there is a risk of message loss.
The order message
Sequential message refers to that the producer publishes the message in a certain order. Consumers subscribe to messages in a predetermined order, that is, the first messages published must be received by consumers first.
For example, the data distribution scenario, if we subscribe to Mysql binlog for data heterogeneity. When messages are out of order, data corruption is a problem.
For example, add a new item of data with ID =1 and delete it immediately. This produces two messages. Normal consumption order is first add, then delete, at this time the data is not available. If the messages are out of order, the deleted ones are consumed first, and then the new ones are consumed, while the data is still there and not deleted, resulting in inconsistencies.
Timing of the message
A scheduled message is a message that can be periodically sent. Once the message is sent to the server, it is not immediately delivered to consumers. Instead, the message is delivered to the consumer for consumption after the specified time.
Delayed messages are timed messages. Timed messages are sent at a certain point in time, for example, 2020-11-11 12:00:00.
For example, if the current time is 2020-09-10 12:00:00 and the delay is 10 minutes, the message will be delivered to consumers at 2020-09-10 12:10:00 after being successfully sent.
Timed messages can be used in scenarios such as the automatic cancellation of an order due to time out.
Transaction message
RocketMQ provides distributed transaction functionality similar to X/Open XA, with the ultimate consistency of distributed transactions achieved through RocketMQ transaction messages.
Interactive process:
Photo source: Aliyun official document
-
The sender first sends a semi-transactional message to the RocketMQ server.
-
After the RocketMQ server receives the message and persists it successfully, it returns an Ack to the sender confirming that the message has been successfully sent. At this point, the message is semi-transactional and will not be delivered to the consumer.
-
Upon receiving an Ack for the semi-transaction message, the sender begins to execute the local transaction logic.
-
If the local transaction execution succeeds, Commit the message; if the execution fails, Rollback the message. After the server receives the Commit status, the semi-transaction message will be marked as deliverable, and the consumer will finally receive the message. If the server receives the Rollback status, the semi-transaction message is deleted and the consumer will not receive the message.
-
If an accident occurs, step 4 Does not confirm the message again. After a fixed period, the server checks the message back.
-
After receiving the message, the sender needs to check the final result of the local transaction execution of the corresponding message. The sender submits a second acknowledgement based on the final status of the local transaction, and the server continues to perform operations on the half-transaction message according to Step 4.
Best practices
Message retry
After a message fails to be consumed by the consumer, the RocketMQ server redelivers the message until the consumer has successfully consumed it, with a limited number of retries (16 by default).
Message retry ensures that messages are not lost to a certain extent and can be consumed through retries. It should be noted that the consumer must wait for the local business to succeed before ACK(consumption confirmation), otherwise the consumption will fail, but the ACK, the message will not be repeated delivery.
If adopt the method of asynchronous consumption, the need for asynchronous synchronous, asynchronous operations such as just an ACK, specific can refer to an article I wrote before mp.weixin.qq.com/s/Bbh1GDpmk… .
Finally, you need to do a good job of monitoring. If the retries are still failed after 4 or 5 times, basically, the retries are also failed. At this time, the developer needs to know that the manual processing of manual intervention. Or monitor dead letter queues directly.
The message filter
Message topic, generally used for a unified classification of a class of messages. For example, the order topic, but the message under the order can be divided into many kinds. Such as creating orders, cancelling orders and so on.
Different types of messages have different business processes. We can define the message format uniformly, and then do different business logic by differentiating message types through a field. The downside is that all the messages are pushed to the consumer and can’t be consumed on demand.
In RocketMQ, messages can be assigned tags to distinguish message types. Consumers can perform message filtering on the RocketMQ server based on tags to ensure that consumers end up consuming only the message types they care about.
I once came across a way in which tag was not used correctly, with only one INSTANCE of MQ, using tags to differentiate the environment. All messages are in a topic, test environment consumption test environment tag, online consumption line tag.
The problem with this approach is that the messages are not isolated, so the online and offline messages are all together. Another is that tags are fixed as environmental distinctions and cannot be used in message type scenarios, resulting in multiple topics being built to host multiple business message types.
Consumption patterns
There are two RocketMQ consumption modes, cluster consumption and broadcast consumption.
Cluster consumption:
Consumers deploy multiple instances of what we call a cluster, and cluster consumption is only consumed by one of those instances.
Suitable for most business scenarios, most of our messages can only be consumed once, and only one consumer to consume, such as the payment callback scenario, if a message is consumed by multiple instances at the same time, it will appear to modify the order status and reduce inventory at the same time.
Broadcast consumption:
Broadcast consumption is consumed once for each instance in the cluster.
For example, when we use the local cache, when the data changes, we need to refresh the local cache of each node, so each node needs to receive messages.
The power consumption etc.
Idempotent problems are encountered in both API request scenarios and message consumption scenarios. A message cannot be consumed more than once. This must be guaranteed, because there is no guarantee that a message will not be sent more than once, or that a message will not be delivered more than once.
RocketMQ’s exact-once delivery semantics are used to solve idempotent problems. Exactly-Once means that a message sent to the message system can be processed by the consuming end only Once. Even if a message is re-delivered at the production end, the message is consumed only Once at the consuming end.
The optimal idempotent processing method still requires a unique business identifier. Although each message has a MessageId, it is not recommended to use MessageId for idempotent judgment. When sending messages, a MessageKey can be set for each message. This MessageKey can be used as a unique identifier for the business.
I’m not going to go into details about idempotent. You can refer to an article I wrote before mp.weixin.qq.com/s/9fhqnbeXP… , general idempotent implementation scheme.
Local transaction message encapsulation
As described above, transaction messages for RocketMQ take a two-phase commit approach. And combined with the mechanism of message backcheck to ensure the final consistency.
In terms of usage, every business scenario has to implement a backcheck logic, which is a bit annoying.
Another approach that is often used is local transaction messages. The local message table scheme, originally proposed by ebay, involves creating a message table in the database corresponding to the service, and instead of actually sending the message to MQ, inserting a message data into the message table.
The inserted action is the same transaction as the local business logic. If the local transaction succeeds, the message will be dropped from the table and sent to MQ. If the local transaction fails, the message data will be rolled back.
A special program is then required to pull unsent messages from the message table and deliver them to MQ, and if the delivery fails, it can be retried until success or human intervention.
Messages are written to the message table and then sent to MQ all the way, which is fine. Message loss occurs if the Broker goes down while the message is still in PageCache after MQ receives it. Of course, you can also use synchronous brush disk and other ways to avoid loss. If we are asynchronous flush disk, is there a way to ensure that messages are not lost?
As mentioned earlier, RocketMQ transaction messages have a look-up mechanism, message table style, and there needs to be a mechanism to ensure that the message is consumed, otherwise the message needs to be retried repeatedly until the message is consumed.
A field is required in the message table to identify the current status of the message, such as unsent, sent, consumed. The message is sent to MQ when it is still unsent, and if it is sent successfully, the status is sent. But after a few minutes, the status is still sent, and it’s time to do something.
In this scenario, it is possible that consumers can’t keep up with the pace of production, and messages accumulate so that they remain unconsumed. Another possibility is is the message lost?
The corresponding message accumulation data can be retrieved to determine if the message has accumulated and, if not, resend the message to MQ until it has been consumed.
The problem is that the message was consumed. How do I know?
Like the cloud service I use, there is a corresponding Open API to directly query the message track. Open source should also have, not carefully to study, with the commercial version should be about the same.
The trace of the message tells whether the message has been consumed, and the process ends there. Messages sent to MQ will be retried if they fail, and messages will be re-sent if they have not been consumed for a long time. Even if they finally enter a dead letter queue, they can also be manually intervened by monitoring the dead letter queue to ensure final consistency.
Compared with the own transaction message, the local message table does not need to implement back lookup logic, but to increase the message table, but also to support a variety of send, check and other logic, is also quite troublesome. Especially when the number of messages is large, how to quickly send messages in the message table also needs to do a lot of processing, simple table lookup polling is not suitable for a large number of cases.
Both methods can be used to achieve the purpose we want.
Reference code
Codes related to local transaction messages can be found by searching “Ape World” on wechat and replying to the keyword “Kitty”.
Code word is not easy, if you can do a triple punch, thank you!
About the author: Yin Jihuan, a simple technology lover, the author of Spring Cloud Micro-service – Full stack Technology and Case Analysis, Spring Cloud Micro-service Introduction combat and Progress, the founder of the public account Monkey World.
I have organized a very complete learning material. If you are interested, you can search “Monkey World” on wechat and reply to the keyword “learning material” to obtain the Spring Cloud, Spring Cloud Alibaba, Sharing-JDBC sub-library sub-table and task scheduling framework XXL-job that I have organized. MongoDB, crawler and other related information.