[Microservice Asynchronous Architecture – RocketMQ]

“We all know that to turn a microservice architecture into an asynchronous architecture you only need to add MQ, and there are many open source frameworks for MQ. Which open source framework for MQ is appropriate?”

What is MQ? What does MQ work on?

MQ is a Message Queue, short for Message Queue. Message queues are a form of communication. The essence of a message is a data structure. Because MQ centrally processes and stores messages in a project, MQ has major decoupling, concurrency, and peak clipping capabilities.

1. Decoupling:

MQ message producers and consumers do not care about the existence of each other, and the whole system is decoupled by the presence of MQ middleware.

If services communicate with each other through RPC, when a service communicates with hundreds of services, if the communication interface of that service changes, then the communication interface of hundreds of services will change, which is a very troublesome thing.

But with MQ, either the producer or the consumer can change themselves individually. Their changes will not affect other services. Thus achieving the purpose of decoupling. Why decouple? In plain English, it is convenient and reduces unnecessary workload.

2, the concurrent

MQ has clusters of producers and consumers, so when clients are hundreds of millions of users, they are all in parallel. Thus greatly improve the response speed.

3, the peak clipping

Because MQ can store a large number of messages, it can store a large number of message requests and process them concurrently.

If THE USE of RPC communication, each request to call the RPC interface, when the request volume is huge, because the RPC request is very resource consumption, so the huge request will certainly overwhelm the server.

The goal of peak clipping is to make the user experience better and make the whole system stable. Ability to handle large number of request messages.

Second, what MQ is on the market now,

Highlights RocketMQ

There are many MQ products on the market, including RabbitMQ, ActiveMQ, ZeroMQ, RocketMQ, Kafka, etc. These are all open source MQ products. Many people have recommended using RabbitMQ in the past and it is a very useful MQ product that I won’t cover here. Kafka is also the leader in high throughput, which we won’t cover here.

We highlight RocketMQ, an open-source distributed messaging middleware developed by Alibaba in 2012. RocketMQ has been donated to the Apache Software Foundation and became a top-level project of Apache on September 25, 2017.

As a domestic middleware that has experienced the baptism of “super project” of Alibaba Double 11 for many times and has stable and excellent performance, it has been used by more and more domestic enterprises in recent years with its characteristics of high performance, low latency and high reliability.

Feature overview diagram

As you can see, RocketMQ supports timed and delayed messages, which RabbitMQ does not.

* Physical structure of RocketMQ *

It can be seen that RocketMQ involves four clusters: Producer, Name Server, Consumer, and Broker.

Producer cluster:

RocketMQ provides three ways to send messages: synchronous, asynchronous, and one-way.

One, general messages

1. Synchronize schematic diagram

* Synchronize message key code *

try { SendResult sendResult = producer.send(msg); If (sendResult! = null) { System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId()); } catch (Exception e) { System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic()); e.printStackTrace(); }}Copy the code

*2, asynchronous schematic *

* Asynchronous message key code *

producer.sendAsync(msg, New SendCallback() {@overridePublic void onSuccess(final SendResult SendResult) {system.out.println ("send ") message success. topic=" + sendResult.getTopic() + ", msgId=" + sendResult.getMessageId()); }@Overridepublic void onException(OnExceptionContext context) { System.out.println("send message failed. topic=" + context.getTopic() + ", msgId=" + context.getMessageId()); }});Copy the code

*3, Oneway sending schematic *

One-way only send, do not wait for return, so the fastest, generally in the microsecond level, but may be lost

Oneway send message key code

producer.sendOneway(msg);
Copy the code

For details about the codes for sending messages, see help.aliyun.com/document_de… ]

Two, timing message and delay message

Key code for sending periodic messages

Try {// A timed message, in milliseconds (ms), to be delivered at a specified timestamp (after the current time), such as 2016-03-07 16:21:00. If set to a time before the current timestamp, the message will be delivered to the consumer immediately. long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2016-03-07 16:21:00").getTime(); msg.setStartDeliverTime(timeStamp); SendResult = producer. Send (MSG); System.out.println("MessageId:"+sendResult.getMessageId()); }catch (Exception e) {// The message failed to be sent and retry is required. Println (new Date() + "Send MQ message failed. Topic is:" + msg.gettopic ()); e.printStackTrace(); }Copy the code

* Send delay message key code *

Long delayTime = System.currentTimemillis () + 3000; long delayTime = System.currentTimemillis () + 3000; MSG. SetStartDeliverTime (delayTime); SendResult sendResult = producer.send(msg); If (sendResult! = null) { System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId()); }} catch (Exception e) {// The message failed to be sent and retry is required. Println (new Date() + "Send MQ message failed. Topic is:" + msg.gettopic ()); e.printStackTrace(); }Copy the code

Matters needing attention

1. The msg.setStartDeliverTime parameter of timed and delayed messages needs to be set to a moment (in milliseconds) after the current timestamp. If set to a time before the current timestamp, the message will be delivered to the consumer immediately.

2. The msg.setStartDelivertime parameter of timed and delayed messages can be set at any time within 40 days (in milliseconds). After 40 days, message delivery will fail.

3. StartDeliverTime is the time when the service end starts to deliver goods to the consumer end. If the consumer currently has a message backlog, timed and delayed messages will come after the backlog and will not be delivered exactly at the configured time.

4. Due to the time difference between the client and the server, the actual delivery time of the message may differ from the delivery time set by the client.

5. After setting the delivery time of timed and delayed messages, it is still limited by the message saving time of 3 days. For example, if a timed message is set to be consumed after 5 days, it will be deleted on 8 days if it is not consumed after 5 days.

6. No language supports delayed messages except the Java language.

Publish message schematics

Third, transaction messages

RocketMQ provides X/Open XA-like distributed transaction capabilities to ensure ultimate consistency between business senders and MQ messages, essentially placing distributed transactions on the MQ side in a semi-message manner.

Schematic diagram

Among them:

1. Sends a message to the RocketMQ server of the message queue.

2. After persisting the message successfully, the server confirms to the sender ACK that the message has been successfully sent. In this case, the message is a half-message.

3, the sender starts to execute the local transaction logic.

4. According to the local transaction execution result, the sender submits a secondary confirmation (Commit or Rollback) to the server. When the server receives the Commit status, the semi-message is marked as deliverable and the subscriber will finally receive the message. The server receives the Rollback status and deletes the half-message, which will not be accepted by the subscriber.

5. In the case of network disconnection or application restart, if the second confirmation submitted in Step 4 does not reach the server, the server will check the message back after a fixed period of time.

6. After receiving the message, the sender needs to check the final result of the local transaction execution of the corresponding message.

7. The sender submits a second confirmation according to the final status of the local transaction obtained from the check, and the server still performs operations on the half-message according to Step 4.

The caveats to RocketMQ’s half-message mechanism are as follows

1. According to step 6, it can be seen that he requires the sender to provide the service lookup interface.

2. The sender’s message is not guaranteed to be idempotent, and duplicate messages may exist in the absence of an ACK return

3. The consumer should do idempotent processing.

The core code

final BusinessService businessService = new BusinessService(); // Local business

TransactionProducer producer = ONSFactory.createTransactionProducer(properties,new LocalTransactionCheckerImpl()); producer.start(); Message msg = new Message("Topic", "TagA", "Hello MQ transaction===".getBytes()); try { SendResult sendResult = producer.send(msg, new LocalTransactionExecuter() { @Override public TransactionStatus execute(Message msg, String msgId = msg.getmsgid (); String msgId = msg.getmsgid (); MD5 long crc32Id = hashutil.crc32code (msg.getBody())); // Message ID and crc32ID are mainly used to prevent message duplication // If the business itself is idempotent, you can ignore it, otherwise you need to use msgId or crc32ID to do idempotent // If the message is absolutely not repeated, you can use msgId or crc32ID to do idempotent. The recommended practice is to use CRc32 or MD5 for the message body to prevent duplicate messages Object businessServiceArgs = New Object(); TransactionStatus transactionStatus =TransactionStatus.Unknow; try { boolean isCommit = businessService.execbusinessService(businessServiceArgs); If (isCommit) {/ / local transaction success submit news transactionStatus = TransactionStatus.Com mitTransaction; } else {/ / local transaction is rolled back failure message transactionStatus = transactionStatus. RollbackTransaction; } } catch (Exception e) {log.error("Message Id:{}", msgId, e); } System.out.println(msg.getMsgID()); log.warn("Message Id:{}transactionStatus:{}", msgId, transactionStatus.name()); return transactionStatus; } }, null); }catch (Exception e) {// The message failed to be sent and retry is required. Println (new Date() + "Send MQ message failed. Topic is:" + msg.gettopic ()); e.printStackTrace(); }Copy the code

Specific code reference document: help.aliyun.com/document_de… ]

* All message publishing schematics *

Producer is stateless and can be deployed in clusters.

Name Server cluster:

NameServer is a nearly stateless node that can be clustered and no information is synchronized between nodes. NameServer functions much like a registry.

I heard that Ali’s NameServer was made by ZooKeeper. Maybe Because ZooKeeper could not meet the requirements of large-scale concurrency, Ali developed NameServer by himself.

A NameServer is a routing table that manages discovery and registration between Producer and Comsumer.

The Broker cluster:

Broker deployment is relatively complex. Brokers are divided into Master and Slave brokers. A Master can correspond to multiple Slavers, but a Slaver can correspond to only one Master.

A different BrokerId is defined, with a BrokerId 0 for Master and a non-0 for Slaver. Multiple Master can be deployed. Each Broker establishes long connections to all nodes in the NameServer cluster and periodically registers Topic information to all Nameservers.

Consumer clusters:

Subscribe to the way

Message queue RocketMQ supports two subscription methods:

** Cluster subscription: ** All consumers identified by the same Group ID share consumption messages equally. For example, if a Topic has nine messages and a Group ID has three Consumer instances, then only three of the messages are consumed in a clustered consumption mode, with each instance split equally.

/ / a subscription form cluster set (not set, the default subscription form) for the cluster properties. The put (PropertyKeyConst. MessageModel, PropertyValueConst. CLUSTERING);Copy the code

** Broadcast subscription: ** All consumers identified by the same Group ID each consume a message once. For example, if a Topic has nine messages and a Group ID has three Consumer instances, each instance will consume nine messages in broadcast consumption mode.

A subscription form / / radio set properties. The put (PropertyKeyConst. MessageModel, PropertyValueConst. BRCopy the code

“`OADCASTING); `

Subscribe message key code: *

Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe("TopicTestMQ", "TagA||TagB", **new** MessageListener() {// Consume (Message Message, ConsumeContext context) { System.out.println("Receive: " + message); return Action.CommitMessage; }}); // Subscribe to another Topicconsumer. Subscribe (" topicTestmq-other ", "*", **new** MessageListener() {// Consume all Tagpublic actions (Message Message, ConsumeContext context) { System.out.println("Receive: " + message); return Action.CommitMessage; }}); consumer.start();Copy the code

Matters needing attention:

Midempotent processing is done on the consumer side, and almost all MQ does not do midempotent processing, requiring business processing because midempotent processing on the MQ side can bring MQ complexity and severely affect MQ performance.

Message sending and receiving model

A primary and sub-account is created

The primary and sub-accounts are created because of permission problems. The following is a master account creation flowchart

Detailed operation address: help.aliyun.com/document_de… ]

Sub-account Flow chart

Detailed operation address: help.aliyun.com/document_de… ]

MQ is a microservices architecture

Very important part


The birth of MQ provides a way to change the original synchronous architecture thinking to asynchronous architecture thinking, and provides a good solution for the stability of large-scale, highly concurrent business scenarios.

Martin Fowler emphasizes that the first rule of distributed invocation is not to be distributed. This may seem philosophical, but in the case of enterprise application systems, this principle is forced to break as long as the entire system is constantly evolving and multiple subsystems coexist.

This principle proposed by Martin Fowler, on the one hand, is intended for designers to treat distributed invocation with caution, on the other hand, it is also due to the defects of distributed systems themselves.

Therefore, microservices are not a panacea, and the appropriate architecture is the best architecture.