1. RocketMQ is introduced
Rocketmq is an open source distributed messaging middleware developed by Alibaba. It originates from but does not follow the JMS specification. As for distribution, if you have worked with other MQS and learned about RocketMQ, you know that RocketMQ is by nature distributed — broker, provider, consumer, etc
2. RocketMQ advantages
-
Remove the dependency on ZK
-
Supports asynchronous and synchronous disk flushing
-
The number of queues or topics supported by a single server is 5W
-
Support message retry
-
Supports sending messages in strict order
-
Supports periodic message sending
-
Supports query by message ID
-
Supports backtracking of messages to a point in time
-
Supports filtering of message servers
-
Consumption parallelism: Sequential consumption depends on the number of queues, out-of-order consumption depends on the number of consumers
3. RocketMQ sends and consumes messages
(1) Create the parent project
Pom. XML is as follows
<? xml version="1.0" encoding="UTF-8"? > <project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0. 0</modelVersion>
<groupId>com.james</groupId>
<artifactId>rocketmq-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>pom</packaging>
<modules>
<module>MQProducer-demo</module>
<module>MQConsume-demo</module>
</modules>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.james</groupId>
<artifactId>common_utils</artifactId>
<version>0.03.-SNAPSHOT</version> </dependency> <! -- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.92.</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.1822.</version> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <! -- This configuration must be --> <fork>true</fork>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Copy the code
(2) Create message producers
Create project MQproducer-demo
Pom.xml is shown below
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>rocketmq-demo</artifactId>
<groupId>com.james</groupId>
<version>1.0 the SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<packaging>jar</packaging>
<artifactId>MQProducer-demo</artifactId>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>com.james</groupId>
<artifactId>common_utils</artifactId>
</dependency>
<! -- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
</project>
Copy the code
Modify the application.properties file
server.port=8082
spring.application.name=producer-demo
spring.redis.host=localhost
spring.redis.port=6379
spring.redis.password=
spring.redis.database=10
swagger.enable=true
spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
spring.jackson.time-zone=GMT+8
rocketmq.name-server=localhost:9876
rocketmq.producer.group=The 2021-11
# Whether to enable automatic configuration
rocketmq.producer.isOnOff=on
Rocketmq uses ip@pid (pid stands for JVM name) as its unique identifier
rocketmq.producer.groupName=${spring.application.name}
# nameserver address for mq
rocketmq.producer.namesrvAddr=localhost:9876
The maximum length of a message is 1024 * 4 (4M)
rocketmq.producer.maxMessageSize = 4096
Send message timeout, default 3000
rocketmq.producer.sendMsgTimeOut=3000
Number of retries (2 by default)
rocketmq.producer.retryTimesWhenSendFailed=2
Copy the code
Create a new message producer configuration class
package com.james.mq.producer.config;
import lombok.Data;
import lombok.ToString;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/ * * *@author james
* @version 1.0
* @descriptionMessage producer configuration *@date2021/10/30 "* /
@Data
@ToString
@Configuration
@ConfigurationProperties(prefix = "rocketmq.producer")
public class MQProducerConfigure {
public static final Logger LOGGER = LoggerFactory.getLogger(MQProducerConfigure.class);
private String groupName;
private String namesrvAddr;
// Maximum number of messages
private Integer maxMessageSize;
// Message sending timeout
private Integer sendMsgTimeOut;
// Number of failed retries
private Integer retryTimesWhenSendFailed;
/** * MQ generator configuration *@return
* @throws MQClientException
*/
@Bean
@ConditionalOnProperty(prefix = "rocketmq.producer", value = "isOnOff", havingValue = "on")
public DefaultMQProducer defaultProducer(a) throws MQClientException {
LOGGER.info("DefaultProducer is creating -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --");
DefaultMQProducer producer = new DefaultMQProducer(groupName);
producer.setNamesrvAddr(namesrvAddr);
producer.setVipChannelEnabled(false);
producer.setMaxMessageSize(maxMessageSize);
producer.setSendMsgTimeout(sendMsgTimeOut);
producer.setRetryTimesWhenSendAsyncFailed(retryTimesWhenSendFailed);
producer.start();
LOGGER.info("Rocketmq producer server to success -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --");
returnproducer; }}Copy the code
Create a message producer interface
package com.james.mq.producer.controller;
import com.james.common.result.Result;
import com.james.common.utils.StringUtils;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/ * * *@author james
* @version 1.0
* @description: message producer *@date2021/10/30 and * /
@RestController
@RequestMapping("/api")
public class ProducerController {
public static final Logger LOGGER = LoggerFactory.getLogger(ProducerController.class);
@Autowired
DefaultMQProducer defaultMQProducer;
/** * send a simple MQ message *@param msg
* @return* /
@GetMapping("/send")
public Result send(String msg) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
if (StringUtils.isEmpty(msg)) {
return Result.OK();
}
LOGGER.info("Send MQ message content:" + msg);
Message sendMsg = new Message("HelloTopic"."HelloTag", msg.getBytes());
// The default timeout is 3 seconds
SendResult sendResult = defaultMQProducer.send(sendMsg);
LOGGER.info("Message send response:" + sendResult.toString());
returnResult.OK(sendResult); }}Copy the code
test
The producer console sends messages:
(3) Create message consumers
Pom.xml with the message producer module
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>rocketmq-demo</artifactId>
<groupId>com.james</groupId>
<version>1.0 the SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>MQConsume-demo</artifactId>
<packaging>jar</packaging>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>com.james</groupId>
<artifactId>common_utils</artifactId>
</dependency>
<! -- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
</project>
Copy the code
Modifying a Configuration File
spring.application.name=consumers-demo
server.port=8801
spring.redis.host=localhost
spring.redis.port=6379
spring.redis.password=
spring.redis.database=10
swagger.enable=true
spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
spring.jackson.time-zone=GMT+8
# Whether to enable automatic configuration
rocketmq.consumer.isOnOff=on
Rocketmq uses ip@pid (pid stands for JVM name) as its unique identifier
rocketmq.consumer.groupName=${spring.application.name}
# nameserver address for mq
rocketmq.consumer.namesrvAddr=127.0.0.1:9876
# consumers subscribe to the theme of the topic and tags (* subscribe to this topic all the tags) and formats: topic ~ tag1 | | tag2 | | tags3;
rocketmq.consumer.topics=TestTopic~TestTag; TestTopic~HelloTag; HelloTopic~HelloTag; MyTopic~*
# consumer thread data volume
rocketmq.consumer.consumeThreadMin=5
rocketmq.consumer.consumeThreadMax=32
# Set the number of items of consumer confidence at a time, default 1
rocketmq.consumer.consumeMessageBatchMaxSize=1
Copy the code
Mq Consumer configuration
package com.james.mq.consume.config;
import lombok.Data;
import lombok.ToString;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/ * * *@author james
* @version 1.0
* @description: MQ consumer configuration *@date2021/10/30 17:34 * /
@Data
@ToString
@Configuration
@ConfigurationProperties(prefix = "rocketmq.consumer")
public class MQConsumerConfigure {
public static final Logger LOGGER = LoggerFactory.getLogger(MQConsumerConfigure.class);
private String groupName;
private String namesrvAddr;
private String topics;
// The amount of consumer thread data
private Integer consumeThreadMin;
private Integer consumeThreadMax;
private Integer consumeMessageBatchMaxSize;
@Autowired
private MQConsumeMsgListenerProcessor consumeMsgListenerProcessor;
/** * MQ consumer configuration *@return
* @throws MQClientException
*/
@Bean
@ConditionalOnProperty(prefix = "rocketmq.consumer", value = "isOnOff", havingValue = "on")
public DefaultMQPushConsumer defaultConsumer(a) throws MQClientException {
LOGGER.info("DefaultConsumer is creating -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --");
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
consumer.setNamesrvAddr(namesrvAddr);
consumer.setConsumeThreadMin(consumeThreadMin);
consumer.setConsumeThreadMax(consumeThreadMax);
consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
// Set the listener
consumer.registerMessageListener(consumeMsgListenerProcessor);
/** * Sets whether the first consumer starts at the head of the queue or at the end of the queue */
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
/** * sets the consumption model, cluster or broadcast, default cluster */
// consumer.setMessageModel(MessageModel.CLUSTERING);
try {
// Set the topic and tag to which the consumer subscribes. If you subscribe to all tags under the topic, use *,
String[] topicArr = topics.split(";");
for (String topic : topicArr) {
String[] tagArr = topic.split("~");
consumer.subscribe(tagArr[0]."*");
}
consumer.start();
LOGGER.info(GroupName ={}, topics={}, namesrvAddr={}",groupName,topics,namesrvAddr);
} catch (MQClientException e) {
LOGGER.error("Consumer creation failed!");
}
returnconsumer; }}Copy the code
This is initialized, the actual of message processing in the consumer to consumer. RegisterMessageListener (consumeMsgListenerProcessor); This listener class, actually receive messages, processing messages are put in the listener class
package com.james.mq.consume.config;
import org.apache.commons.collections.CollectionUtils;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.List;
/ * * *@author james
* @version 1.0
* @description: Consumer monitoring *@date2021/10/30 caught * /
@Component
public class MQConsumeMsgListenerProcessor implements MessageListenerConcurrently {
public static final Logger LOGGER = LoggerFactory.getLogger(MQConsumeMsgListenerProcessor.class);
/ * * * the default only a message in the MSG, you can set consumeMessageBatchMaxSize parameters to batch receives the message * don't throw exceptions, if there is no return CONSUME_SUCCESS, consumer will start spending the message, Until return CONSUME_SUCCESS * *@param msgList
* @param consumeConcurrentlyContext
* @return* /
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
if (CollectionUtils.isEmpty(msgList)) {
LOGGER.info("MQ received message empty, return success");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
MessageExt messageExt = msgList.get(0);
LOGGER.info("MQ received the message:" + messageExt.toString());
try {
String topic = messageExt.getTopic();
String tags = messageExt.getTags();
String body = new String(messageExt.getBody(), "utf-8");
LOGGER.info("MQ message Topic ={}, tags={}, message content ={}", topic, tags, body);
} catch (Exception e) {
LOGGER.error("Obtaining MQ message content exception {}", e);
}
// TODO handles business logic
returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}Copy the code
Start the test
As shown, a producer message is received