preface

Before introducing a technology, it must first be clear what problems the technology can solve for the project. Individual in the understanding of the Message Queue, the Message Queue), thought the Message Queue is used to send text messages, such as email messages (decoupling) asynchronous, but discovered own understanding is wrong, a deep understanding of MQ function not only reflected in the specific user receives the Message, data can also be used in other applications, general business processing, etc. Message queuing literally means putting messages in queues and consuming them according to the FIFO(first in, first out) feature of queues. In practice, it is a cross-process communication mechanism for messaging between applications.

Before introducing MQ, you need to understand the pros and cons and application scenarios

The main advantages of MQ are decoupling, asynchro, and peak clipping, which are reflected in a simple scenario.

In microservice projects, the system is generally split vertically based on the core business and then deployed separately. In the figure above, each system is mainly responsible for the following contents in order business:

  • Order system: Creates orders and sends order messages (such as order IDS, user data) to MQ
  • MQ: Limit the number of order requests processed per second (e.g. 2000 requests received per second but the database can only process 1000 or 1000, those that cannot be processed will be piled up in the message queue)
  • Logistics system: create order logistics information
  • Point system: update the user’s shopping point information

Imagine the problem with the order creation process in the above scenario without the presence of MQ:

  • After the order system creates the order information, it calls the business interface of the logistics system and the integral system, and the system is seriously coupled (decoupling).
  • If the order system does not call the interface of other systems through threads, it will waste a lot of time waiting for return synchronously (asynchronously, avoiding the trouble of creating thread calls).
  • The database cannot handle too many requests at peak times, leading to application crash (peak clipping)

Every coin has two sides, and while MQ can solve many problems for the system, it can also introduce some problems, such as:

  • As the system complexity increases, repeated message consumption and message loss need to be considered
  • Data consistency problem, the above example of logistics or inventory system write library abnormal how to roll back compensation

Now that you know some of the features of MQ, let’s discuss a few scenarios where MQ can be used:

  • The upstream system does not care about the execution result of the downstream (for example, the user system sends an email to the user through MQ after the user registers successfully, but the system does not care at all if the user fails to register successfully)
  • Timed tasks that rely on data (if the following order is not paid within 24 hours, the order will be cancelled, and if the merchant does not process the refund within 72 hours, the refund will be automatic)

Some problem solving ideas after the introduction of MQ

  • Message repeated consumption (to keep messages idempotent)

    Idempotent: Requests for the same operation have the same result no matter how many times they are requested, which is embodied in MQ as the same message is consumed once no matter how many times it is sent.

    Because of network jitter (delay), the problem of repeated message sending is inevitable. If the idempotent guarantee of message is not done well when consuming at the consumer end, repeated consumption may occur, resulting in the same message being consumed for many times and the library being written for many times. A common practice is to add a unique identifier (ID) to the message, and query the database for the existence of the message record based on the ID during consumption. If no re-insert message exists, the insert consumption is not carried out. When the interval between generation and consumption is not long, Redis can be used to improve the efficiency of message idempotency, for example:

    1. Consumers check whether redis has the message according to the ID before consumption
    2. If the message does not exist, it is consumed and written to redis. If the message does exist, it is not consumed and returned

    About message IDS:

    1. Each RocketMQ message is given a globally unique ID
    2. If messaging middleware does not generate ids, consider some ID services (such as the Snowflake algorithm) that generate globally unique ids
    3. The recommended ID is not associated with actual services

For example, the message center application in my current work is based on the technical architecture of MongoDB+RocketMQ, and MongoDB is responsible for storing messages sent by each application (mainly Sms, Email, etc.). Mongo is queried by RocketMQ’s Message ID before each consumption to ensure Message idempotency to avoid repeated consumption. After successful consumption, the Message status in the DB is updated.

  • Message loss (message reliability)

    The meaning of message loss varies for MQ components and may not be the same as the solution. Take Kafka, Rocket’s message delivery model (Producer->Broker->Consumer) as an example:

    • Producer: Messages are not persisted into the Broker, or consumers fail to successfully consume messages. Kafka can be addressed by changing the ACK configuration, which returns a message sending status code in rocketMQ.
    • Broker: The message reached me successfully, but I lost it for some reason (different MQ may have different reasons due to mechanical problems). If it is a hardware problem (such as downtime, disk corruption), I recommend that you copy me
    • Consumer: I received the message but failed or died without telling the Broker. This solution can be solved by the ACK mechanism of each MQ middleware.

A simple example technology framework and business model based on RocketMQ

The following is a technical framework based on MongoDB+Spring Boot RocketMQ+Eureka+Spring Cloud Config and combined with the problems in MQ to build a simple message center project case, in which the main roles of the components in the project are as follows:

  • Spring Cloud Config: Message configuration center (e.g. Topic, ConsumerGroup, ProducerGroup).
  • Eureka: Application service registry, responsible for the discovery and provision of services in the project.
  • Mongo: Since the transaction relationship of messages is not strong and Mongodb format documents are free (JSON storage, adding and deleting fields at will), Mongodb is used to store messages sent by various applications (mainly Sms, Email, etc.). Mongo is queried by RocketMQ’s Message ID before each consumption to ensure Message idempotency to avoid repeated consumption and save the Message after successful consumption.
  • RocketMQ: Receiving, storing, and sending messages.

The following figure shows the application relationship model of the project:

Message center application: unified universal message processing application, such as SMS sending, email sending, employee service number push and other message processing questionnaire application: responsible for the distribution of employee questionnaire, in this example, it is only a simple message sending test application common: This project is mainly used to demonstrate the functions of MQ and the solutions to problems in use, so the coding part is relatively simple.

Application example coding

  • Common Module Coding (COMMON)

    The common module stores the common application classes (such as entities, constants, configurations, and functions). MessageConstant: Maintains message constants

    public interface MessageConstant {
    
        interface System {
            String QUESTION = "QUESTION";
        }
    
        interface Topic {
            String SMS_TOPIC = "rocketmq.topic.sms";
            String SMS_TOPIC_TEMPLATE = "${rocketmq.topic.sms}";
            String MAIL_TOPIC = "rocketmq.topic.mail";
            String MAIL_TOPIC_TEMPLATE = "${rocketmq.topic.mail}";
        }
    
        interface Producer {
            String SMS_GROUP_TEMPLATE = "${rocketmq.producer.group.sms}";
            String MAIL_GROUP_TEMPLATE = "${rocketmq.producer.group.mail}";
        }
    
        interface Consumer {
            String SMS_GROUP_TEMPLATE = "${rocketmq.consumer.group.sms}";
            String MAIL_GROUP_TEMPLATE = "${rocketmq.consumer.group.mail}"; }}Copy the code

    BaseMessage: the BaseMessage class from which all common messages are inherited to facilitate the management of unified information

    @Data
    @Accessors(chain = truePublic Abstract Class BaseMessage implements Serializable {/** * Message source system :{@link io.wilson.common.message.constant.MessageConstant.System} */ private String system; }Copy the code

    SmsMessage: a universal SmsMessage carrier

    @EqualsAndHashCode(callSuper = true)
    @Data
    @Accessors(chain = true)
    @ToString(callSuper = true) public class SmsMessage extends BaseMessage {/** * private String createUserId; /** * SMS user */ private String toUserId; /** * mobile phone */ private String mobile; /** * private String content; }Copy the code
  • Message Center Application (Message-Center)

    Before the message center can encode, it needs to confirm how the message center should process the message. In the business environment of this project, each application may need to send some SMS messages, emails, service number messages, etc., and the business processing of the same message is consistent. Therefore, the main process of message receiving and consumption in the message center is as follows:

    • Ensure message idempotency (query database to use existing message records to avoid repeated consumption)
    • Message business processing
    • Message logs are stored

    In this project, different message types are stored in different Mongodb collections (same as Mysql Table concept), but share the same MessageLog message logging class:

    @Data
    @Accessors(chain = true) public class MessageLog implements Serializable { private String msgId; / * * * the sender system name {@ the link io.wilson.com mon. Message. Constant. MessageConstant} * / private String system; /** * message object json String */ private String msgContent; /** * private Boolean success; private LocalDateTime createTime; private LocalDateTime updateTime; /** * initializes the message record ** @param message * @return
         */
        public static <T extends BaseMessage> MessageLog convertFromMessage(T message) {
            LocalDateTime now = LocalDateTime.now();
            return new MessageLog()
                    .setSystem(message.getSystem())
                    .setSuccess(false) .setCreateTime(now) .setUpdateTime(now); }}Copy the code

    The core points of personal consideration in the design and development of the consumption process coding are as follows:

    1. Using a common message class (such as SmsMessage) as a mapping object stored in db will cause the message class to be mixed with unnecessary attributes (such as createTime, updateTime, success), and as a general message data carrier, the common message class is better used as a VO than as a DO. Therefore, as additional content on the original message, such as message processing results and message creation and update time, it is more suitable to be maintained in other database mapping objects. Therefore, MessageLog is defined as the entity class of message record
    2. Since it is a general message that can be used by all applications, there must be a certain amount of data. Although the mapped entities are the same, storing them in different collections can improve the convenience of operation and obtain better performance, and the system coding can better screen messages according to the system
    3. In the process of message consumption, only the database name is different to ensure message idempotency and message log entry. Therefore, a parent Listener can be defined to abstract the method of message listening consumption, and the business processing of different messages can be transferred to different message services. The consumption of the same kind of message may be subdivided to call different message business method consumption (such as sending single SMS or sending SMS in batches), so we can abstract each service out of a Consume () method to call specific service business method according to parameters for message consumption
    • Message center class diagram and consumption flowchart

      To better illustrate the relationships between classes in the message center, draw the following class diagram:

      When an SMS message is sent to the message center, its consumption process is shown as follows:

    • Message business process encoding

      BaseMessageService: Message business consumption abstract interface that abstracts the business consumption method invoked by each consumer (Listener)

      Public interface BaseMessageService<T extends BaseMessage> {/** * consume message ** @param message * @param consumeFunction */ consume(T message, Function<T, Boolean> consumeFunction) {consume(T message, Function<T, Boolean> consumeFunction) {returnconsumeFunction.apply(message); }}Copy the code

      BaseMessageService: Abstract interface for SMS service

      @service public interface SmsMessageService extends BaseMessageService<SmsMessage> {/** * sends a single SmsMessage ** @param smsMessage * @return*/ Boolean sendSingle(SmsMessage SmsMessage); }Copy the code

      SmsMessageServiceImpl: short message service implementation class

      @Service @Slf4j public class SmsMessageServiceImpl implements SmsMessageService { @Override public boolean SendSingle (SmsMessage SmsMessage) {// RESULT of SMS service operation Boolean isSuccess =true; /* * SMS operation and set the operation result to isSuccess */if (Objects.equals(smsMessage.getToUserId(), "Wilson")) {
                  isSuccess = false;
                  log.info("Failed to send SMS message :{}", smsMessage);
              }
              returnisSuccess; }}Copy the code
    • Message business process encoding

      MessageLogConstant: Maintain MessageLog related constants (such as collection names of different messages)

      Public interface MessageLogConstant {/** * CollectionName {String SMS ="sms_message_log";
              String MAIL = "mail_message_log"; }}Copy the code

      AbstractMQStoreListener: in an abstract Listener class method that guarantees message idempotent, message log entry operations

      @Slf4j public abstract class AbstractMQStoreListener { @Resource protected MongoTemplate mongoTemplate; /** * Determine whether the message has been consumed ** @param msgId * @return
           */
          protected boolean isConsumed(String msgId) {
              long count = mongoTemplate.count(new Query(Criteria.where("msg_id").is(msgId)), collection());
              if (count > 0) {
                  log.info("Message {} has been successfully consumed, please do not repost!", msgId);
                  return true;
              }
              return false; } / * * * the current message mongo collection name: {@ link IO. Wilson. Message. Domain. Constant. MessageLogConstant. CollectionName} * * @returnThe collection name of the current message store */ protected Abstract String collection(); @param msgId message id @param message */ void store(Boolean success, String msgId, BaseMessage message) { MessageLog messageLog = MessageLog.convertFromMessage(message) .setMsgId(msgId) .setMsgContent(JSONObject.toJSONString(message)) .setSuccess(success); mongoTemplate.insert(messageLog, collection()); }}Copy the code

      SmsMessageListener: an SmsMessageListener (consumer). If an exception is thrown during consumption, RocketMQ will repost the consumption at regular intervals

      @Slf4j @Service @ConditionalOnProperty(MessageConstant.Topic.SMS_TOPIC) @RocketMQMessageListener(topic = MessageConstant.Topic.SMS_TOPIC_TEMPLATE, consumerGroup = MessageConstant.Consumer.SMS_GROUP_TEMPLATE) public class SmsMessageListener extends AbstractMQStoreListener implements RocketMQListener<MessageExt> { @Resource private SmsMessageService smsMessageService;  private static final String EXCEPTION_FORMAT ="SMS message consumption failed, message content: %s";
      
          @Override
          public void onMessage(MessageExt message) {
              String msgId = message.getMsgId();
              if (isConsumed(msgId)) {
                  return;
              }
              SmsMessage smsMessage = JSONObject.parseObject(message.getBody(), SmsMessage.class);
              log.info("Received SMS message {} : {}", msgId, smsMessage); / *if (Objects.equals(smsMessage.getToUserId(), "2020")) {
                  log.error("Message {} consumption failed", message.getMsgId()); // Throw an exception to make RocketMQ repost the message and re-consume throw new MQConsumeException(String.format(EXCEPTION_FORMAT, smsMessage)); }*/ boolean isSuccess = smsMessageService.consume(smsMessage, smsMessageService::sendSingle);if(! isSuccess) { log.info("SMS service operation failed. Message ID: {}", msgId); } // Store (isSuccess, msgId, smsMessage); } @Override protected Stringcollection() {
              returnMessageLogConstant.CollectionName.SMS; }}Copy the code

      MessageCenterApplication: the main program

      @SpringBootApplication @EnableDiscoveryClient public class MessageCenterApplication { public static void main(String[] args) { SpringApplication.run(MessageCenterApplication.class, args); }}Copy the code

      The Spring Cloud configuration file bootstrap.yml

      eureka:
        client:
          service-url:
            defaultZone: http://localhost:8000/eureka
      spring:
        cloud:
          config:
            discovery:
              enabled: true
              service-id: config-center
            # resource file name
            profile: dev
            name: rocketmq
      Copy the code

      SmsSendTest: Unit test class

      @SpringBootTest(classes = MessageCenterApplication.class)
      @RunWith(SpringJUnit4ClassRunner.class)
      public class SmsSendTest {
          @Resource
          private RocketMQTemplate rocketMQTemplate;
          @Value(MessageConstant.Topic.SMS_TOPIC_TEMPLATE)
          private String smsTopic;
      
          @Test
          public void sendSms() {
              SmsMessage smsMessage = new SmsMessage();
              smsMessage.setToUserId("13211")
                      .setMobile("173333222")
                      .setContent("Test SMS message") .setSystem(MessageConstant.System.QUESTION); rocketMQTemplate.send(smsTopic, MessageBuilder.withPayload(smsMessage).build()); }}Copy the code
  • Configuration Center (config-server)

    The main program ConfigServerApplication

    @SpringBootApplication @EnableDiscoveryClient @EnableConfigServer public class ConfigServerApplication { public static void main(String[] args) { SpringApplication.run(ConfigServerApplication.class, args); }}Copy the code

    Spring Cloud configuration file bootstrap.yml:

    spring:
      cloud:
        config:
          server:
            git:
              uri: https://gitee.com/Wilson-He/rocketmq-message-center-demo.git
              username: Wilson-He
              force-pull: true
              password:
              The directory where the configuration file is located in the URI
              search-paths: /config-server-properties
    eureka:
      client:
        service-url:
          defaultZone: http://localhost:8000/eureka
    Copy the code

    Configs-server-properties /rocketmq-dev.properties:

    Rocketmq. Name - server = 127.0.0.1:9876 rocketmq. Topic. SMS = SMS - topic rocketmq. Producer. Group. SMS = SMS - group rocketmq.consumer.group.sms=sms-group rocketmq.topic.mail=mail-topic rocketmq.producer.group.mail=mail-group rocketmq.consumer.group.mail=mail-groupCopy the code

Run the process

  1. Run RocketMQ name-server and broker, for exampleMqnamesrv -n 127.0.0.1:9876.Mqbroker -n 127.0.0.1:9876
  2. Run the Eureka application
  3. Run the configuration center config-server
  4. Run the message center message-center
  5. Run the Message-Center unit test class (SmsSendTest) or run the question-app accesslocalhost:8080/question/toUser? userId=xxxFor consumption test, the message center console prints out log information and Mongo sms_message_log successfully added data that the project is completed

(to be) Extension point:

  1. The RocketMQ sender application can configure rocketMQ.producer. retry-times-when-send-failed/retry-times-when-send-async-failed property in the configuration file to configure RocketMQ synchronization/different Number of retries after a message fails to be sent. If this parameter is not set, the value is 2 by default
  2. When the business performs operations result failure is still the cause of the library is sometimes could be included in the process of business execution call the operation of a third party when the third party error will cause the failure of business operation results, and the operation of the third party is not controllable, so save the easy positioning error results, first and have a business need can also through the timing task library to perform business
  3. In this example, only one message configuration file is used. In actual development, the message configuration needs to be configured to the corresponding project configuration file according to the project requirements. Message configurations such as question-app (e.g. Topc, producerGroup) should be configured in their project configuration files (e.g. Application.yml, namespace at Apollo)
  4. In this project, NameServer and Broker are not deployed in a cluster. After the Broker is deployed in a cluster, synchronous dual write is configured to avoid message loss caused by the failure of the host before it is synchronized to the slave machine.

At the end of the

This article demonstrates some of the ways to handle MQ common problems using Spring Boot RocketMQ with a simple project example:

  • The problem of repeated message consumption can be guaranteed idempotent through database storage
  • If the message consumption operation fails, an exception can be thrown by the Listener to cause RocketMQ to redeliver the message for consumption

Program source code