Author: Xiao Xian

Source code repository: github.com/zhshuixian/…

In the previous section “Actual SQL Database (MyBatis)”, it mainly introduces how to connect MyBatis database and realize data addition, deletion, change, check and other operations. In this section, we integrate RocketMQ with the actual Spring Boot. Message middleware is an important component of modern distributed systems. RocketMQ is an open source distributed message middleware with low latency, high performance, high availability, scalable message publishing and subscription services, supporting trillions of capacity.

RocketMQ was developed by the Alibaba team in the Java language and contributed to the Apache Foundation in 2016 as a top-level project of Apache.

1) Installation and operation of RocketMQ

Prerequisites for installing and running RocketMQ:

  • 64-bit operating system, Linux/Unix/Mac recommended, Windows can also run
  • 64-bit JDK 1.8+
  • Four gigabytes of free disk

Download RocketMQ 4.6.1, open rocketmq.apache.org/release_not RocketMQ website… , select binary file:

Once the download is complete, unzip to the installation directory, open the terminal to go to the installation directory ROCKETMQ_HOME, and run the following command:

Set the minimum and maximum memory for the JVM

# Open runbroker.sh or runbroker.cmd(Windows)
Set the maximum and minimum MEMORY size of the JVM based on the memory size of the computer
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
Copy the code

Run the Name Server

> nohup sh bin/mqnamesrv &
> tail -f ~/logs/rocketmqlogs/namesrv.log
# If successful, you'll see something like this
The Name Server boot success. serializeType=JSON
Copy the code

Run the Broker

> nohup sh bin/mqbroker -n localhost:9876 &
> tail -f~/logs/rocketmqlogs/broker.log The broker[..., ...]  boot success. serializeType=JSON and name server is localhost:9876Copy the code

Shut down the Server

 > sh bin/mqshutdown broker
 > sh bin/mqshutdown namesrv
Copy the code

Windows system

Windows requires setting the environment variable %ROCKETMQ_HOME%
> cd %ROCKETMQ_HOME%\bin
> .\mqnamesrv
You will see this output on the terminal after success
The Name Server boot success. serializeType=JSON
Re-open a terminal
> cd%ROCKETMQ_HOME%\bin > .\mqbroker.cmd -n localhost:9876 The broker[..., ...]  boot success. serializeType=JSON and name server is localhost:9876Stop the Server on Windows by closing the terminal or Ctrl + C
Copy the code

2) Start using

Rocketmq-spring-boot-starter is a spring Boot starter that quickly integrates with RocketMQ. Spring Boot 2.0 or later is required.

Spring Boot integrates RocketMQ, producing and consuming messages.

2.1) New project and common configuration

There will be two new projects, 04-RocketMQ-Producer and 04-RocketMQ-Consumer, respectively producing information and consuming information. Spring Boot is 2.1.13, which depends on Spring Web. Except for the project name, other configurations are basically the same.

Add rocketmq – spring – the boot – starter:

// Gradle // https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter compile group: Rocketmq ', name: 'Rocketmq-spring-boot-starter ', version: '2.1.0'Copy the code
<! -- Maven -->
<! -- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.1.0</version>
</dependency>
Copy the code

Configure the application. The properties

# 04-Rocketmq-producer does not need to set spring.main.web-application-type
# None indicates that the Web container is not started
spring.main.web-application-type=none
spring.application.name=rocketmq-consumer
# RocketMQ Name Server (replace RocketMQ IP address and port number)Rocketmq. Name - server = 192.168.128.10:9876# Name ServerThe boot. Rocketmq. NameServer = 192.168.128.10:9876# The application is configured with the properties used (replace the RocketMQ IP address and port number)
boot.rocketmq.topic=string-topic
boot.rocketmq.topic.user=user-topic
boot.rocketmq.tag=tagA
Copy the code

Create a new User class in both projects:

public class User {
    private String userName;
    private Byte userAge;
   	// Omit the Getter Setter
    @Override
    public String toString(a) {
        return "User{" +
                "userName='" + userName + '\' ' +
                ", userAge=" + userAge +
                '} '; }}Copy the code

2.2) Producer implements message writing

Project name 04-RocketMQ-producer. Implement messages received from RESTful apis written to RocketMQ.

New ProducerService. Class:

@Service
public class ProducerService {
    private Logger logger = LoggerFactory.getLogger(getClass());

    @Resource
    private RocketMQTemplate mqTemplate;

    @Value(value = "${boot.rocketmq.topic}")
    private String springTopic;

    @Value(value = "${boot.rocketmq.topic.user}")
    private String userTopic;

    @Value(value = "${boot.rocketmq.tag}")
    private String tag;

    public SendResult sendString(String message) {
        // Send a String message
        // Call the RocketMQTemplate syncSend method
        SendResult sendResult = mqTemplate.syncSend(springTopic + ":" + tag, message);
        logger.info("syncSend String to topic {} sendResult={} \n", springTopic, sendResult);
        return sendResult;
    }

    public SendResult sendUser(User user) {
        / / send the User
        SendResult sendResult = mqTemplate.syncSend(userTopic, user);
        logger.info("syncSend User to topic {} sendResult= {} \n", userTopic, sendResult);
        returnsendResult; }}Copy the code

Code parsing:

@value (Value = “${boot.rocketMq. topic}”) : Automatically inject the boot.RocketMq. topic Value defined in the application.properties file into the springTopic variable.

Create a RESTful API, producerController.class

@RestController
@RequestMapping("/producer")
public class ProducerController {
    @Resource ProducerService producerService;

    @PostMapping("/string")
    public SendResult sendString(@RequestBody String message){
        return producerService.sendString(message);
    }

    @PostMapping("/user")
    public SendResult sendUser(@RequestBody User user){
        returnproducerService.sendUser(user); }}Copy the code

2.2) Consumer information

Project name 04-RocketMq-Consumer, which can read and consume messages in RocketMQ. Note that this project does not need to start the Web container.

Stringconsumer.class consumes String messages.

@Service
@RocketMQMessageListener(topic = "${boot.rocketmq.topic}", consumerGroup = "string_consumer", selectorExpression = "${boot.rocketmq.tag}")
public class StringConsumer implements RocketMQListener<String> {
    private Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public void onMessage(String message) {
        // Override message handling methods
        logger.info("------- StringConsumer received:{} \n", message);
        // TODO processes messages, such as writing data}}Copy the code

Userconsumer.class consumes messages of type User

@Service
@RocketMQMessageListener(nameServer = "${boot.rocketmq.NameServer}", topic = "${boot.rocketmq.topic.user}", consumerGroup = "user_consumer")
public class UserConsumer implements RocketMQListener<User> {
    private Logger logger = LoggerFactory.getLogger(getClass());
    @Override
    public void onMessage(User user) {
        logger.info("######## user_consumer received: {} ; age: {} ; name: {} \n", user,user.getUserAge(),user.getUserName());
        // TODO handles the message User}}Copy the code

Code parsing:

RocketMQMessageListener: Specifies the topic, consumerGroup, selectorExpression, etc.

Topic = “string-topic”; topic = “string-topic”; topic = “string-topic”;

ConsumerGroup: a consumerGroup that normally consumes the same message;

SelectorExpression: Selects tag, selectorExpression=”tagA”, which only consumes messages with tag as tag; Default “*”, all tags;

RocketMQListener: To implement RocketMQListener, we just need to rewrite the message handling method;

3) Run the project

Start RocketMQ. Start 04-RocketMQ-Producer and 04-RocketMQ-Consumer, respectively.

The Web port being run by Producer is 8080, and the Consumer does not start the Web container.

Start Consumer and you’ll see the following log output:

DefaultRocketMQListenerContainer{consumerGroup='user_consumer', nameServer='192.168.128.10:9876', topic='user-topic', consumeMode=CONCURRENTLY, selectorType=TAG, selectorExpression=The '*', messageModel = 23:11:19 CLUSTERING} 2020-03-16. 16092-636 the INFO [main] O.A.R.S.A.L istenerContainerConfiguration: Register the listener to container, listenerBeanName:userConsumer, ContainerBeanName: org. Apache. Rocketmq. Spring. Support. DefaultRocketMQListenerContainer_1 23:11:19 2020-03-16. 924 INFO 16092 --- [ main] a.r.s.s.DefaultRocketMQListenerContainer : running container: DefaultRocketMQListenerContainer{consumerGroup='string_consumer', nameServer='192.168.128.10:9876', topic='string-topic', consumeMode=CONCURRENTLY, selectorType=TAG, selectorExpression='tagA', messageModel = 23:11:19 CLUSTERING} 2020-03-16. 16092-924 the INFO [main] O.A.R.S.A.L istenerContainerConfiguration: Register the listener to container, listenerBeanName:stringConsumer, containerBeanName:org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_2Copy the code

Open the Postname, testing String types of messages, go to http://localhost:8080/producer/string

Producer Output logs:

The 2020-03-16 23:14:21. 16776-681 the INFO [nio - 8080 - exec - 2] org. Xian. Producer. ProducerService: syncSend String to topic string-topic sendResult=SendResult [sendStatus=SEND_OK, msgId=0000000000000000000000000000000100001F89AB83523BF6E30000, offsetMsgId=C0A8800A00002A9F000000000003ADF2, messageQueue=MessageQueue [topic=string-topic, brokerName=master, queueId=2], queueOffset=4]Copy the code

Consumer log output:

The 2020-03-16 23:14:21. 16092-983 the INFO [MessageThread_1] org. Xian. Consumer. StringConsumer: ------- StringConsumer received:Hello RocketMQ By Spring Boot 0Copy the code

Test the User types of messages, go to http://localhost:8080/producer/user

Producer Output logs:

The 2020-03-16 23:18:11. 16776-548 the INFO [nio - 8080 - exec - 5] org. Xian. Producer. ProducerService: syncSend User to topic user-topic sendResult= SendResult [sendStatus=SEND_OK, msgId=0000000000000000000000000000000100001F89AB83523F79590003, offsetMsgId=C0A8800A00002A9F000000000003B11F, messageQueue=MessageQueue [topic=user-topic, brokerName=master, queueId=3], queueOffset=2]Copy the code

Consumer log output:

The 2020-03-16 23:18:11. 16092-591 the INFO [MessageThread_1] org. Xian. Consumer. UserConsumer:######## user_consumer received: User{userName='RocketMQ With Spring Boot', userAge=4} ; age: 4 ; name: RocketMQ With Spring Boot 
Copy the code

Reference and extended reading:

  • Message queue literacy github.com/Snailclimb/…
  • RocketMQ – Spring github.com/apache/rock…

This chapter mainly describes how to run RocketMQ on a single machine, how to integrate RocketMQ with Spring Boot, and how to generate and consume messages. For RocketMQ on a cluster, the advanced use of RocketMQ is not described here. In the next chapter, you’ll start working on Spring’s security framework, which includes:

  • Spring Security
  • Spring Security integrates JJWT for Token login and authentication
  • Integrate Shiro (Token)
  • Achieve wechat Login (Token)

Welcome to “Advanced Programming Technology” or Xiao Xian’s blog.