Previous series review:

Learn about microservices (part 1) : Learn about microservices

Learning microservices series (2) : Build services based on SpringBoot

Learning micro-service series (3) : Springboot + page front and back end separation and RESTFUL style interface writing

Learning microservice series (4) : Springboot service Gateway Gateway

Learning microservices series (5) : SpringBoot microservices use NACOS as the registry

Learning microservice series (vi) : SpringBoot microservice uses NACOS as the configuration center

Learning microservice series (7) : NacOS principle analysis

Learning microservices series (8) : Springboot services distributed transactions and solutions

Learning microservice series (nine) : Springboot service interface security authentication design

In the distributed service system, we often have some asynchronous communication scenarios. For example, in the e-commerce business, when the order service calls the commodity service, we adopt the synchronous method. The order service invokes the goods service, and the goods service inventory operates. If the order service needs to call multiple services such as payment service and commodity service at the same time, the whole order request will not be executed until the response of each service is completed, which will greatly reduce the user experience. The user will be waiting in the web side, waiting for the application to process. In asynchrony, the client request does not block the process and the server response can be non-immediate. The user experience will be great. MQ messaging middleware was also used for distributed transaction processing in our previous article.

Message queue product

At present, the mainstream asynchronous middleware of the Internet is MQ. MQ mainly uses the following types:

  • rabbitMQ
  • kafka
  • rocketMQ

The core function of the whole asynchronous messaging middleware is application decoupling, flow peak cutting and valley filling and message communication. Below, we introduce the overall use of MQ in the system. We’ll talk about rocketMQ separately later.

The asynchronous processing capabilities of MQ

We take the example that users need to send emails and short messages after they successfully register a system. When users successfully register, the system needs to send emails and short messages to inform them. In this scenario, there are two traditional cooking methods, serial and parallel. As shown below:

  1. Synchronization: After the registration information is successfully written to the database, the system sends a registration email and then a registration SMS message. After the preceding three tasks are complete, the system returns them to the client. Assuming that each of the three business nodes uses 50 ms, leaving aside other overhead such as the network, the serial time is 150 ms.

  1. Parallel mode: After the registration information is successfully written into the database, the registration success message is sent to MQ. After that, the mail sending service listens to read MQ for sending emails, and listens to read and send short messages for sending short messages. According to the convention above, the user’s response time is equal to the time the registration information is written to the database + the time it is sent to the message queue, which is 55 milliseconds.

By comparison, writing to the message queue is fast enough to be negligible, so the user response time might be 50 milliseconds.

The application decoupling capability of MQ

The decoupling capability of MQ can also be illustrated with an example: in an e-commerce business scenario, users can order and purchase services. When a user places an order, the order system notifies the inventory system. Traditionally, the order system calls the inventory system interface. The diagram below:

Disadvantages of the traditional model:

  1. If the inventory system is inaccessible, the order destocking will fail, resulting in an order failure.
  2. The order system is coupled to the inventory system.

How to solve the above problems? The scheme after the introduction of application message queue is shown as follows:

After application decoupling, as shown in the figure above:

  1. Order system: after the user places an order, the order system completes the persistent processing, writes the message to the message queue, and returns the user to place the order successfully.
  2. Inventory system: subscribe to the order message, using the pull/push way to obtain the order information, inventory system according to the order information, inventory operation.

The inventory system is not working properly when an order is placed based on the modified decoupling service. It also does not affect the normal order, because after the order is written to the message queue, the order system does not care about other subsequent operations. Realize the order system and inventory system application decoupling.

Flow peak-cutting and valley filling capacity

Traffic peak-cutting is also a common scenario in message queue, which is widely used in seckilling or group hijacking activities. The application will be suspended due to heavy traffic. To solve this problem, you need to queue messages at the front end of the application.

  1. Can control the number of activities.
  2. It can alleviate the crushing of applications with high flow in a short period of time.

  • The server receives the user’s request and writes it to the message queue.
  • The second kill service performs subsequent processing according to the request information in the message queue.

Message queue middleware comparison (borrow a diagram from the web)

RocketMQ

After we’ve looked at the capabilities and characteristics of messaging middleware, we’ll focus on one of them, rocketMQ. RocketMQ is alibaba’s open source messaging middleware, currently incubated in Apache and developed using pure Java, featuring high throughput, high availability and suitable for large-scale distributed system applications. RocketMQ originated from Kafka, but it is not a simple copy. It optimizates the reliable transmission and transactional of messages. Currently, it is widely used in Transactions, recharge, stream computing, message push, log stream processing, binglog distribution and other scenarios in Alibaba Group, supporting alibaba’s multiple Double Eleven events.

As shown in the figure above, the whole can be divided into four roles: Producer, Consumer, Broker and NameServer.

  • NameServer

It can be understood as a registry for message queues, with all brokers registered to nameServer.

  • Broker

Broker is the core of RocketMQ, providing the functions of receiving, storing, and pulling messages. It is generally necessary to ensure that the Broker is highly available, so it is configured to be Slave. When the Master fails, consumers can consume the Slave. Brokerages are divided into Master and Slave brokers. One Master can correspond to multiple Slaves. The relationship between Master and Slave is defined by specifying the same BrokerName and different Brokerids.

  • Producer

The producer of the message queue needs to establish a connection with NameServer, obtain Topic routing information from NameServer, and establish a connection with the Broker Master that provides Topic services.

  • Consumer

Consumers of message queues also establish connections with NameServer, obtain Topic routing information from NameServer, and establish connections with Broker masters and slaves that provide Topic services.

  • Topic and Message Group

After introducing the above four roles, we also need to focus on the Topic and Message Group mentioned above. Topic is used to distinguish different types of messages. Before sending and receiving messages, we need to create a Topic to send and receive messages. In order to improve performance and throughput, Message Group is introduced.

The test code

  • producers
public class DemoProducer {

     public static void main(String[] args) throws Exception {
           / / Producer
           DefaultMQProducer producer = new DefaultMQProducer("GroupName");
           producer.setNamesrvAddr("192.168.0.12:9876");
           // Initialize Producer. During the entire application lifecycle, the Producer needs to initialize only once
           producer.start();
           for (int i = 0; i < 10; i++) {
                Message msg = new Message("Topic1"."Tag1",
                           ("Hello World"+ i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.println(sendResult); } producer.shutdown(); }}Copy the code
  • consumers
public class DemoConsumer {

     public static void main(String[] args) throws MQClientException {
           DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("");
           consumer.setNamesrvAddr("192.168.0.12:9876");
           consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
           consumer.subscribe("Topic1"."*");
           consumer.registerMessageListener(new MessageListenerConcurrently() {
                public ConsumeConcurrentlyStatus consumeMessage(List
       
         msgs, ConsumeConcurrentlyContext context)
        {
                     System.out.printf(Thread.currentThread().getName() + "Messages :" + msgs + "%n");
                     returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}); consumer.start(); }}Copy the code

Message order consumption

In the technical architecture introduction above, we already know that RocketMQ is unordered on a topic, so how can we ensure that it is ordered? In fact, we just need to deal with the same semantics of the message into the same queue (for example, the same order here), we can use Hash to ensure that the same order in the same queue. We need to define a fixed key, so the same key will be sent to the same queue.

public Boolean aliSent(Object body, String shardingKey) {
        String topic = TOPIC;
        Message msg = new Message(topic,TAG,JSON.toJSONString(body).getBytes());
        SendResult sendResult = orderProducer.send(msg, shardingKey);
        log.info("AliRocketMQ messaging Message Id: {}, | | shardingKey: {}, | | Message body: {}", sendResult.getMessageId(), shardingKey,JSON.toJSONString(body));
        return true;
}
Copy the code

Repeated message consumption

The fundamental solution to the problem of repeated consumption is two words – idempotent. So we need to give our consumers idempotence, that is, the result of processing the same message, no matter how many times it is executed. This can be done by writing Redis, because Redis keys and values are inherently idempotent. Of course, there is also the use of database insert method, based on the unique key of the database to ensure that duplicate data cannot be inserted multiple times.

Read more: IT Technology Small Stack