1. RocketMQ is introduced

Rocketmq is an open source distributed messaging middleware developed by Alibaba. It originates from but does not follow the JMS specification. As for distribution, if you have worked with other MQS and learned about RocketMQ, you know that RocketMQ is by nature distributed — broker, provider, consumer, etc

2. RocketMQ advantages

  • Remove the dependency on ZK

  • Supports asynchronous and synchronous disk flushing

  • The number of queues or topics supported by a single server is 5W

  • Support message retry

  • Supports sending messages in strict order

  • Supports periodic message sending

  • Supports query by message ID

  • Supports backtracking of messages to a point in time

  • Supports filtering of message servers

  • Consumption parallelism: Sequential consumption depends on the number of queues, out-of-order consumption depends on the number of consumers

3. RocketMQ sends and consumes messages

(1) Create the parent project

Pom. XML is as follows

<? xml version="1.0" encoding="UTF-8"? > <project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0. 0</modelVersion>

    <groupId>com.james</groupId>
    <artifactId>rocketmq-demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <packaging>pom</packaging>
    <modules>
        <module>MQProducer-demo</module>
        <module>MQConsume-demo</module>
    </modules>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>com.james</groupId>
                <artifactId>common_utils</artifactId>
                <version>0.03.-SNAPSHOT</version> </dependency> <! -- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
            <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-client</artifactId>
                <version>4.92.</version>
            </dependency>

            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.1822.</version> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <! -- This configuration must be --> <fork>true</fork>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

        </plugins>

    </build>

</project>
Copy the code
(2) Create message producers

Create project MQproducer-demo

Pom.xml is shown below


      
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <artifactId>rocketmq-demo</artifactId>
        <groupId>com.james</groupId>
        <version>1.0 the SNAPSHOT</version>
        <relativePath>../pom.xml</relativePath>
    </parent>

    <packaging>jar</packaging>

    <artifactId>MQProducer-demo</artifactId>

    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>com.james</groupId>
            <artifactId>common_utils</artifactId>
        </dependency>

        <! -- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>
</project>

Copy the code

Modify the application.properties file

server.port=8082

spring.application.name=producer-demo

spring.redis.host=localhost
spring.redis.port=6379
spring.redis.password=
spring.redis.database=10

swagger.enable=true

spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
spring.jackson.time-zone=GMT+8


rocketmq.name-server=localhost:9876
rocketmq.producer.group=The 2021-11
# Whether to enable automatic configuration
rocketmq.producer.isOnOff=on
Rocketmq uses ip@pid (pid stands for JVM name) as its unique identifier
rocketmq.producer.groupName=${spring.application.name}
# nameserver address for mq
rocketmq.producer.namesrvAddr=localhost:9876
The maximum length of a message is 1024 * 4 (4M)
rocketmq.producer.maxMessageSize = 4096
Send message timeout, default 3000
rocketmq.producer.sendMsgTimeOut=3000
Number of retries (2 by default)
rocketmq.producer.retryTimesWhenSendFailed=2

Copy the code

Create a new message producer configuration class

package com.james.mq.producer.config;

import lombok.Data;
import lombok.ToString;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/ * * *@author james
 * @version 1.0
 * @descriptionMessage producer configuration *@date2021/10/30 "* /
@Data
@ToString
@Configuration
@ConfigurationProperties(prefix = "rocketmq.producer")
public class MQProducerConfigure {


    public static final Logger LOGGER = LoggerFactory.getLogger(MQProducerConfigure.class);

    private String groupName;
    private String namesrvAddr;
    // Maximum number of messages
    private Integer maxMessageSize;
    // Message sending timeout
    private Integer sendMsgTimeOut;
    // Number of failed retries
    private Integer retryTimesWhenSendFailed;

    /** * MQ generator configuration *@return
     * @throws MQClientException
     */
    @Bean
    @ConditionalOnProperty(prefix = "rocketmq.producer", value = "isOnOff", havingValue = "on")
    public DefaultMQProducer defaultProducer(a) throws MQClientException {
        LOGGER.info("DefaultProducer is creating -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --");
        DefaultMQProducer producer = new DefaultMQProducer(groupName);
        producer.setNamesrvAddr(namesrvAddr);
        producer.setVipChannelEnabled(false);
        producer.setMaxMessageSize(maxMessageSize);
        producer.setSendMsgTimeout(sendMsgTimeOut);
        producer.setRetryTimesWhenSendAsyncFailed(retryTimesWhenSendFailed);
        producer.start();
        LOGGER.info("Rocketmq producer server to success -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --");
        returnproducer; }}Copy the code

Create a message producer interface

package com.james.mq.producer.controller;

import com.james.common.result.Result;
import com.james.common.utils.StringUtils;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/ * * *@author james
 * @version 1.0
 * @description: message producer *@date2021/10/30 and * /
@RestController
@RequestMapping("/api")
public class ProducerController {

    public static final Logger LOGGER = LoggerFactory.getLogger(ProducerController.class);

    @Autowired
    DefaultMQProducer defaultMQProducer;

    /** * send a simple MQ message *@param msg
     * @return* /
    @GetMapping("/send")
    public Result send(String msg) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        if (StringUtils.isEmpty(msg)) {
            return Result.OK();
        }
        LOGGER.info("Send MQ message content:" + msg);
        Message sendMsg = new Message("HelloTopic"."HelloTag", msg.getBytes());
        // The default timeout is 3 seconds
        SendResult sendResult = defaultMQProducer.send(sendMsg);
        LOGGER.info("Message send response:" + sendResult.toString());
        returnResult.OK(sendResult); }}Copy the code

test

The producer console sends messages:

(3) Create message consumers

Pom.xml with the message producer module


      
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <artifactId>rocketmq-demo</artifactId>
        <groupId>com.james</groupId>
        <version>1.0 the SNAPSHOT</version>
        <relativePath>../pom.xml</relativePath>
    </parent>


    <artifactId>MQConsume-demo</artifactId>

    <packaging>jar</packaging>

    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>com.james</groupId>
            <artifactId>common_utils</artifactId>
        </dependency>

        <! -- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>

</project>

Copy the code

Modifying a Configuration File

spring.application.name=consumers-demo
server.port=8801

spring.redis.host=localhost
spring.redis.port=6379
spring.redis.password=
spring.redis.database=10

swagger.enable=true

spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
spring.jackson.time-zone=GMT+8
# Whether to enable automatic configuration
rocketmq.consumer.isOnOff=on
Rocketmq uses ip@pid (pid stands for JVM name) as its unique identifier
rocketmq.consumer.groupName=${spring.application.name}
# nameserver address for mq
rocketmq.consumer.namesrvAddr=127.0.0.1:9876
# consumers subscribe to the theme of the topic and tags (* subscribe to this topic all the tags) and formats: topic ~ tag1 | | tag2 | | tags3;
rocketmq.consumer.topics=TestTopic~TestTag; TestTopic~HelloTag; HelloTopic~HelloTag; MyTopic~*
# consumer thread data volume
rocketmq.consumer.consumeThreadMin=5
rocketmq.consumer.consumeThreadMax=32
# Set the number of items of consumer confidence at a time, default 1
rocketmq.consumer.consumeMessageBatchMaxSize=1
Copy the code

Mq Consumer configuration

package com.james.mq.consume.config;

import lombok.Data;
import lombok.ToString;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/ * * *@author james
 * @version 1.0
 * @description: MQ consumer configuration *@date2021/10/30 17:34 * /
@Data
@ToString
@Configuration
@ConfigurationProperties(prefix = "rocketmq.consumer")
public class MQConsumerConfigure {
    public static final Logger LOGGER = LoggerFactory.getLogger(MQConsumerConfigure.class);

    private String groupName;
    private String namesrvAddr;
    private String topics;
    // The amount of consumer thread data
    private Integer consumeThreadMin;
    private Integer consumeThreadMax;
    private Integer consumeMessageBatchMaxSize;

    @Autowired
    private MQConsumeMsgListenerProcessor consumeMsgListenerProcessor;
    /** * MQ consumer configuration *@return
     * @throws MQClientException
     */
    @Bean
    @ConditionalOnProperty(prefix = "rocketmq.consumer", value = "isOnOff", havingValue = "on")
    public DefaultMQPushConsumer defaultConsumer(a) throws MQClientException {
        LOGGER.info("DefaultConsumer is creating -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --");
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
        consumer.setNamesrvAddr(namesrvAddr);
        consumer.setConsumeThreadMin(consumeThreadMin);
        consumer.setConsumeThreadMax(consumeThreadMax);
        consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
        // Set the listener
        consumer.registerMessageListener(consumeMsgListenerProcessor);

        /** * Sets whether the first consumer starts at the head of the queue or at the end of the queue */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        /** * sets the consumption model, cluster or broadcast, default cluster */
// consumer.setMessageModel(MessageModel.CLUSTERING);

        try {
            // Set the topic and tag to which the consumer subscribes. If you subscribe to all tags under the topic, use *,
            String[] topicArr = topics.split(";");
            for (String topic : topicArr) {
                String[] tagArr = topic.split("~");
                consumer.subscribe(tagArr[0]."*");
            }
            consumer.start();
            LOGGER.info(GroupName ={}, topics={}, namesrvAddr={}",groupName,topics,namesrvAddr);
        } catch (MQClientException e) {
            LOGGER.error("Consumer creation failed!");
        }
        returnconsumer; }}Copy the code

This is initialized, the actual of message processing in the consumer to consumer. RegisterMessageListener (consumeMsgListenerProcessor); This listener class, actually receive messages, processing messages are put in the listener class

package com.james.mq.consume.config;

import org.apache.commons.collections.CollectionUtils;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import java.util.List;

/ * * *@author james
 * @version 1.0
 * @description: Consumer monitoring *@date2021/10/30 caught * /
@Component
public class MQConsumeMsgListenerProcessor implements MessageListenerConcurrently {
    public static final Logger LOGGER = LoggerFactory.getLogger(MQConsumeMsgListenerProcessor.class);


    / * * * the default only a message in the MSG, you can set consumeMessageBatchMaxSize parameters to batch receives the message * don't throw exceptions, if there is no return CONSUME_SUCCESS, consumer will start spending the message, Until return CONSUME_SUCCESS * *@param msgList
     * @param consumeConcurrentlyContext
     * @return* /
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        if (CollectionUtils.isEmpty(msgList)) {
            LOGGER.info("MQ received message empty, return success");
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        MessageExt messageExt = msgList.get(0);
        LOGGER.info("MQ received the message:" + messageExt.toString());
        try {
            String topic = messageExt.getTopic();
            String tags = messageExt.getTags();
            String body = new String(messageExt.getBody(), "utf-8");

            LOGGER.info("MQ message Topic ={}, tags={}, message content ={}", topic, tags, body);
        } catch (Exception e) {
            LOGGER.error("Obtaining MQ message content exception {}", e);
        }
        // TODO handles business logic
        returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}Copy the code

Start the test

As shown, a producer message is received