Author: Xiao Xian
Source code repository: github.com/zhshuixian/…
In the previous section “Actual SQL Database (MyBatis)”, it mainly introduces how to connect MyBatis database and realize data addition, deletion, change, check and other operations. In this section, we integrate RocketMQ with the actual Spring Boot. Message middleware is an important component of modern distributed systems. RocketMQ is an open source distributed message middleware with low latency, high performance, high availability, scalable message publishing and subscription services, supporting trillions of capacity.
RocketMQ was developed by the Alibaba team in the Java language and contributed to the Apache Foundation in 2016 as a top-level project of Apache.
1) Installation and operation of RocketMQ
Prerequisites for installing and running RocketMQ:
- 64-bit operating system, Linux/Unix/Mac recommended, Windows can also run
- 64-bit JDK 1.8+
- Four gigabytes of free disk
Download RocketMQ 4.6.1, open rocketmq.apache.org/release_not RocketMQ website… , select binary file:
Once the download is complete, unzip to the installation directory, open the terminal to go to the installation directory ROCKETMQ_HOME, and run the following command:
Set the minimum and maximum memory for the JVM
# Open runbroker.sh or runbroker.cmd(Windows)
Set the maximum and minimum MEMORY size of the JVM based on the memory size of the computer
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
Copy the code
Run the Name Server
> nohup sh bin/mqnamesrv &
> tail -f ~/logs/rocketmqlogs/namesrv.log
# If successful, you'll see something like this
The Name Server boot success. serializeType=JSON
Copy the code
Run the Broker
> nohup sh bin/mqbroker -n localhost:9876 &
> tail -f~/logs/rocketmqlogs/broker.log The broker[..., ...] boot success. serializeType=JSON and name server is localhost:9876Copy the code
Shut down the Server
> sh bin/mqshutdown broker
> sh bin/mqshutdown namesrv
Copy the code
Windows system
Windows requires setting the environment variable %ROCKETMQ_HOME%
> cd %ROCKETMQ_HOME%\bin
> .\mqnamesrv
You will see this output on the terminal after success
The Name Server boot success. serializeType=JSON
Re-open a terminal
> cd%ROCKETMQ_HOME%\bin > .\mqbroker.cmd -n localhost:9876 The broker[..., ...] boot success. serializeType=JSON and name server is localhost:9876Stop the Server on Windows by closing the terminal or Ctrl + C
Copy the code
2) Start using
Rocketmq-spring-boot-starter is a spring Boot starter that quickly integrates with RocketMQ. Spring Boot 2.0 or later is required.
Spring Boot integrates RocketMQ, producing and consuming messages.
2.1) New project and common configuration
There will be two new projects, 04-RocketMQ-Producer and 04-RocketMQ-Consumer, respectively producing information and consuming information. Spring Boot is 2.1.13, which depends on Spring Web. Except for the project name, other configurations are basically the same.
Add rocketmq – spring – the boot – starter:
// Gradle // https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter compile group: Rocketmq ', name: 'Rocketmq-spring-boot-starter ', version: '2.1.0'Copy the code
<! -- Maven -->
<! -- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
Copy the code
Configure the application. The properties
# 04-Rocketmq-producer does not need to set spring.main.web-application-type
# None indicates that the Web container is not started
spring.main.web-application-type=none
spring.application.name=rocketmq-consumer
# RocketMQ Name Server (replace RocketMQ IP address and port number)Rocketmq. Name - server = 192.168.128.10:9876# Name ServerThe boot. Rocketmq. NameServer = 192.168.128.10:9876# The application is configured with the properties used (replace the RocketMQ IP address and port number)
boot.rocketmq.topic=string-topic
boot.rocketmq.topic.user=user-topic
boot.rocketmq.tag=tagA
Copy the code
Create a new User class in both projects:
public class User {
private String userName;
private Byte userAge;
// Omit the Getter Setter
@Override
public String toString(a) {
return "User{" +
"userName='" + userName + '\' ' +
", userAge=" + userAge +
'} '; }}Copy the code
2.2) Producer implements message writing
Project name 04-RocketMQ-producer. Implement messages received from RESTful apis written to RocketMQ.
New ProducerService. Class:
@Service
public class ProducerService {
private Logger logger = LoggerFactory.getLogger(getClass());
@Resource
private RocketMQTemplate mqTemplate;
@Value(value = "${boot.rocketmq.topic}")
private String springTopic;
@Value(value = "${boot.rocketmq.topic.user}")
private String userTopic;
@Value(value = "${boot.rocketmq.tag}")
private String tag;
public SendResult sendString(String message) {
// Send a String message
// Call the RocketMQTemplate syncSend method
SendResult sendResult = mqTemplate.syncSend(springTopic + ":" + tag, message);
logger.info("syncSend String to topic {} sendResult={} \n", springTopic, sendResult);
return sendResult;
}
public SendResult sendUser(User user) {
/ / send the User
SendResult sendResult = mqTemplate.syncSend(userTopic, user);
logger.info("syncSend User to topic {} sendResult= {} \n", userTopic, sendResult);
returnsendResult; }}Copy the code
Code parsing:
@value (Value = “${boot.rocketMq. topic}”) : Automatically inject the boot.RocketMq. topic Value defined in the application.properties file into the springTopic variable.
Create a RESTful API, producerController.class
@RestController
@RequestMapping("/producer")
public class ProducerController {
@Resource ProducerService producerService;
@PostMapping("/string")
public SendResult sendString(@RequestBody String message){
return producerService.sendString(message);
}
@PostMapping("/user")
public SendResult sendUser(@RequestBody User user){
returnproducerService.sendUser(user); }}Copy the code
2.2) Consumer information
Project name 04-RocketMq-Consumer, which can read and consume messages in RocketMQ. Note that this project does not need to start the Web container.
Stringconsumer.class consumes String messages.
@Service
@RocketMQMessageListener(topic = "${boot.rocketmq.topic}", consumerGroup = "string_consumer", selectorExpression = "${boot.rocketmq.tag}")
public class StringConsumer implements RocketMQListener<String> {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public void onMessage(String message) {
// Override message handling methods
logger.info("------- StringConsumer received:{} \n", message);
// TODO processes messages, such as writing data}}Copy the code
Userconsumer.class consumes messages of type User
@Service
@RocketMQMessageListener(nameServer = "${boot.rocketmq.NameServer}", topic = "${boot.rocketmq.topic.user}", consumerGroup = "user_consumer")
public class UserConsumer implements RocketMQListener<User> {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public void onMessage(User user) {
logger.info("######## user_consumer received: {} ; age: {} ; name: {} \n", user,user.getUserAge(),user.getUserName());
// TODO handles the message User}}Copy the code
Code parsing:
RocketMQMessageListener: Specifies the topic, consumerGroup, selectorExpression, etc.
Topic = “string-topic”; topic = “string-topic”; topic = “string-topic”;
ConsumerGroup: a consumerGroup that normally consumes the same message;
SelectorExpression: Selects tag, selectorExpression=”tagA”, which only consumes messages with tag as tag; Default “*”, all tags;
RocketMQListener: To implement RocketMQListener, we just need to rewrite the message handling method;
3) Run the project
Start RocketMQ. Start 04-RocketMQ-Producer and 04-RocketMQ-Consumer, respectively.
The Web port being run by Producer is 8080, and the Consumer does not start the Web container.
Start Consumer and you’ll see the following log output:
DefaultRocketMQListenerContainer{consumerGroup='user_consumer', nameServer='192.168.128.10:9876', topic='user-topic', consumeMode=CONCURRENTLY, selectorType=TAG, selectorExpression=The '*', messageModel = 23:11:19 CLUSTERING} 2020-03-16. 16092-636 the INFO [main] O.A.R.S.A.L istenerContainerConfiguration: Register the listener to container, listenerBeanName:userConsumer, ContainerBeanName: org. Apache. Rocketmq. Spring. Support. DefaultRocketMQListenerContainer_1 23:11:19 2020-03-16. 924 INFO 16092 --- [ main] a.r.s.s.DefaultRocketMQListenerContainer : running container: DefaultRocketMQListenerContainer{consumerGroup='string_consumer', nameServer='192.168.128.10:9876', topic='string-topic', consumeMode=CONCURRENTLY, selectorType=TAG, selectorExpression='tagA', messageModel = 23:11:19 CLUSTERING} 2020-03-16. 16092-924 the INFO [main] O.A.R.S.A.L istenerContainerConfiguration: Register the listener to container, listenerBeanName:stringConsumer, containerBeanName:org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_2Copy the code
Open the Postname, testing String types of messages, go to http://localhost:8080/producer/string
Producer Output logs:
The 2020-03-16 23:14:21. 16776-681 the INFO [nio - 8080 - exec - 2] org. Xian. Producer. ProducerService: syncSend String to topic string-topic sendResult=SendResult [sendStatus=SEND_OK, msgId=0000000000000000000000000000000100001F89AB83523BF6E30000, offsetMsgId=C0A8800A00002A9F000000000003ADF2, messageQueue=MessageQueue [topic=string-topic, brokerName=master, queueId=2], queueOffset=4]Copy the code
Consumer log output:
The 2020-03-16 23:14:21. 16092-983 the INFO [MessageThread_1] org. Xian. Consumer. StringConsumer: ------- StringConsumer received:Hello RocketMQ By Spring Boot 0Copy the code
Test the User types of messages, go to http://localhost:8080/producer/user
Producer Output logs:
The 2020-03-16 23:18:11. 16776-548 the INFO [nio - 8080 - exec - 5] org. Xian. Producer. ProducerService: syncSend User to topic user-topic sendResult= SendResult [sendStatus=SEND_OK, msgId=0000000000000000000000000000000100001F89AB83523F79590003, offsetMsgId=C0A8800A00002A9F000000000003B11F, messageQueue=MessageQueue [topic=user-topic, brokerName=master, queueId=3], queueOffset=2]Copy the code
Consumer log output:
The 2020-03-16 23:18:11. 16092-591 the INFO [MessageThread_1] org. Xian. Consumer. UserConsumer:######## user_consumer received: User{userName='RocketMQ With Spring Boot', userAge=4} ; age: 4 ; name: RocketMQ With Spring Boot
Copy the code
Reference and extended reading:
- Message queue literacy github.com/Snailclimb/…
- RocketMQ – Spring github.com/apache/rock…
This chapter mainly describes how to run RocketMQ on a single machine, how to integrate RocketMQ with Spring Boot, and how to generate and consume messages. For RocketMQ on a cluster, the advanced use of RocketMQ is not described here. In the next chapter, you’ll start working on Spring’s security framework, which includes:
- Spring Security
- Spring Security integrates JJWT for Token login and authentication
- Integrate Shiro (Token)
- Achieve wechat Login (Token)
Welcome to “Advanced Programming Technology” or Xiao Xian’s blog.