SpringBoot and RabbitMQ integration examples of the code cloud repository address: gitee.com/qinstudy/sp…

RabbitMQ messaging middleware

What’s wrong with you, Xiao Wang? I see you don’t look very good.

Wang: Recently, the team is using message queue to achieve decoupling between business modules. I stayed up all night yesterday, but it was also in a fog. Oh, it’s almost boiling into panda eyes.

Big bang: Uh-oh. The use of message queue can decouple multiple modules and realize asynchronous communication, reducing the overall response time of the system. Your team has a good idea. What kind of message queue does it use?

Wang: I searched the Internet for a long time yesterday. There are ActiveMQ, RabbitMQ, RocketMQ and Kafka. I found a map of the differences between them and sent it to you

Big list: I have seen the comparison of these four message queues before. Your company has more than 100 people and is probably a small and medium sized company. It is a good choice to use RabbitMQ as message queues and the RabbitMQ community is very active. It was resolved by the RabbitMQ community.

Wang: Sure, bang brother. Teach me. I only have a few concepts in mind right now: producer, consumer, message model, etc

2. RabbitMQ Core Concepts (analogies to real life scenarios)

Yes, you are absolutely right, the core concept of RabbitMQ is the producer, consumer and message model, which is made up of switches, routes and queues.

Xiao Wang: Now that you say so, I remember that the concept of switch and routing was introduced in last night’s book, but I still think it is too abstract and not easy to understand.

It’s really hard to understand. I didn’t understand it either at the beginning of my study, but then I came up with a scenario of sending and getting express in real life, which can explain switches, routing and queues. Let’s take qin, who lives in Wuhan, as an example, and wants to send a care package to you (Xiao Wang) in Shenzhen.

Wang: Qin sent me a care package that made my heart warm. Well, what actually happened was I sent a package to Qin, whoo-hoo.

Xiao Wang, there will be bread and milk, as long as we grow up in the sun. Anyway, let’s say Qin sends you a care package. The process should be as follows: Qin gives the package to the Courier, who then sends the package from Wuhan to Shenzhen by some means of transportation. Xiao Wang happily picks up the package from the Shenzhen delivery point.

By analogy, the following relationship can be obtained:

Celery: The producer

Care package: News

Courier: Switch

Some form of transport: routing

Wuhan to Shenzhen: queue name

Xiao Wang: Consumers

Basically, Qin gave the package to a Courier, who then used some form of transportation to transport it from Wuhan to Shenzhen, where Wang happily picked it up. We can regard it as: the producer generates the message, and then the message reaches the switch, and then sends the message to the specified queue through routing. Finally, the consumer listens to the message in the queue for consumption processing.


Xiao Wang: I seem to understand a little bit when you say that. The actual life scene of sending and receiving express can be regarded as the following message transmission process. The producer gives the message to the switch, and then the switch sends the message to the queue from Wuhan to Shenzhen through routing, and the consumer takes out the message for processing. The relationship between producers, messages, switches, routes, queues and consumers in RabbitMQ can be drawn as follows:

Big bang: uh-huh, you can understand!

Wang: I’m flattered, this is all a result of the panda eyes last night, and after discussing with you just now, I feel I understand the concept of RabbitMQ better. I now understand the general principles of RabbitMQ producers, message models, and consumers, but the team had to build a framework for RabbitMQ and SpringBoot integration. It took me half a night to build it, but I didn’t finish it, and I had a weekly meeting in the afternoon, waiting to be punched by the boss.

3. RabbitMQ sends and receives messages

RabbitMQ integration with SpringBoot is not too difficult as SpringBoot is integrated and more flexible than the Spring framework.

3.1 Introduce RabbitMQ starter dependencies in POM files

All we need to do is introduce the RabbitMQ starter dependency into the POM file

  <! RabbitMQ startup dependencies, and spring-boot integrated into a Jar package
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>1.5.6. RELEASE</version>
</dependency>
Copy the code

3.2. Configure host and port information for RabbitMQ

In the application.properties configuration file, add host host ADDRESS, port port number, user name, and password

Virtual-host =/ # configure host, port, user name, and password. Spring. Rabbitmq. host=127.0.0.1  spring.rabbitmq.username=guest spring.rabbitmq.password=guestCopy the code

Next, declare RabbitMQ queues, switches, and routes in the configuration file:

Set up switch, route, queue, use directExchange message model Said local development environment mq. Env = local # set direct message queue in the model, switching, routing, mq. Basic. Info. Queue. Name = ${mq. Env}.. Middleware mq. Basic. Info. Queue. Not mq.basic.info.exchange.name=${mq.env}.middleware.mq.basic.info.exchange.demo1 mq.basic.info.routing.key.name=${mq.env}.middleware.mq.basic.info.routing.key.demo1Copy the code

The RabbitMQ server address is set to localhost and the user name and password are set to guest. You set the names of the queues, switches, and routes, and then you inject the queues, switches, and routes into the Spring container for management, right?

Yes, thinking quickly. RabbitsMQ can cause all sorts of headaches if configured and used incorrectly in real projects, and is often asked by interviewers: How do I prevent message loss? How to ensure that consumption is not repeated consumption? I had a message loss at that time and had to endure several panda eyes to solve the message loss problem.

Wang: Bang brother, don’t keep me in suspense. If you tell me everything, I’ll give you a cold coke!

3.3. Customize injection Bean components

I looked up and studied RabbitMQ tutorials and videos and RabbitMQ officials gave us three guidelines to ensure high availability and confirmation consumption of messages. That is, if we want to ensure high availability and confirmation consumption of messages, we need to follow these three principles: producer send confirmation mechanism; Set persistence mode when creating queue and switch messages. Consumer confirmation ACK mechanism.

Wang: Bung is awesome, a summary of RabbitMQ experience!

Don’t give me a tall hat, I just learned RabbitMQ six months earlier than you. The code corresponding to the above three criteria has been configured and can be used directly. Project link at code cloud warehouse: gitee.com/qinstudy/sp…

3.3.1 Sending confirmation mechanism of producers

RabbitMQ requires the producer to “send confirmation” after sending the message, and if the producer confirms it successfully, the message has been sent! The code looks like this:

 /** * Build the operation component instance of RabbitMQ sending messages * producer sending acknowledgement */
    @Bean(name = "rabbitMQTemplate")
    public RabbitTemplate rabbitTemplate(a) {
        // The producer confirms that the message was sent
        connectionFactory.setPublisherConfirms(true);

        // After the producer sends a message, it returns a feedback message
        connectionFactory.setPublisherReturns(true);

        // Build the rabbitTemlate operation template
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);

        // After the producer sends a message, if the message is sent successfully, a message is sent successfully log is displayed
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("Message sent successfully :correlationData({}), ACK ({}),cause({})",correlationData,ack,cause); }});// After the producer sends a message, if the message fails to be sent, a message sending failure log is displayed
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.info("Message loss: exchange ({}), the route ({}), replyCode ({}), replyText ({}), message: {}",exchange,routingKey,replyCode,replyText,message); }});return rabbitTemplate;
    }

Copy the code

3.3.2. Create queues, switches, messages, and set persistence mode

The second rule is to ensure that messages in RabbitMQ queues are “not lost”. RabbitMQ recommends durable queues and switches with true as the durable parameter. It is also strongly recommended that you set the message persistence mode to “persistent” when creating a message. This ensures that the queue and switch will survive a RabbitMQ server crash and restart and that the message will not be lost.

Set the durable parameter to true (the durable parameter is true). The code is as follows:

 /** * Create direct message model: queue, switch, route */
    // create queue
    @Bean(name = "basicQueue")
    public Queue basicQueue(a) {
        return new Queue(env.getProperty("mq.basic.info.queue.name"), true);
    }

    1.2. Create a switch
    @Bean
    public DirectExchange basicExchange(a) {
        return new DirectExchange(env.getProperty("mq.basic.info.exchange.name"), true.false);
    }
   
    // create a binding relationship
    @Bean
    public Binding basicBinding(a) {
        return BindingBuilder.bind(basicQueue()).to(basicExchange()).with(env.getProperty("mq.basic.info.routing.key.name"));
    }
Copy the code

When creating a message, set the message’s persistence mode to “Persistent” as follows:

@Component
@Slf4j
public class BasicPublisher {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    Environment env;

    // Send a string message
    public void sendMsg(String messageStr) {
        if(! Strings.isNullOrEmpty(messageStr)) {try {
                rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
                rabbitTemplate.setExchange(env.getProperty("mq.basic.info.exchange.name"));
                rabbitTemplate.setRoutingKey(env.getProperty("mq.basic.info.routing.key.name"));

                // 2 Create queues, switches, and messages to set the persistence mode
                // Set the persistence mode for messages
                Message message = MessageBuilder.withBody(messageStr.getBytes("utf-8")).
                        setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
                rabbitTemplate.convertAndSend(message);
                log.info("Basic Message Model - Producer - Sending messages: {}", messageStr);

            } catch (UnsupportedEncodingException e) {
                log.error("Basic Message Model - producer - Sending message exception: {}", messageStr, e.fillInStackTrace()); }}}}Copy the code

3.3.3 ACK mechanism of consumer confirmation consumption

There are three confirmation mechanisms for consumers: None, Auto, and Manual

None: No confirmation message is sent, that is, the consumer sends any feedback to the MQ server;

Auto: the consumer automatically confirms the purchase. After the consumer processes the message, it needs to send an automatic ACK message to the MQ server, after which the message is removed from the MQ queue. The underlying implementation logic is that components built into RabbitMQ automatically send confirmation feedback.

“Manual” : manually confirm the consumption mechanism. After the consumer processes the message, it needs to manually send an ACK feedback “in code” to the MQ server.

RabbitmqConfig#listenerContainer() with the following code:

 /** * Sets the Ack acknowledgement mechanism for single consumer * consumer to AUTO */
    @Bean(name = "singleListenerContainer")
    public SimpleRabbitListenerContainerFactory listenerContainerFactory(a) {
        SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory();
        containerFactory.setConnectionFactory(connectionFactory);
        containerFactory.setMessageConverter(new Jackson2JsonMessageConverter());

        // Set the number of consumers
        containerFactory.setConcurrentConsumers(1);
        // Set the maximum number of consumers
        containerFactory.setMaxConcurrentConsumers(1);
        // Sets the number of messages pulled by the consumer at a time, i.e. several messages pulled by the consumer at a time
        containerFactory.setPrefetchCount(1);

        // Set the acknowledgement message model to AUTO acknowledgement consumption to prevent message loss and repeated consumption
        containerFactory.setAcknowledgeMode(AcknowledgeMode.AUTO);
        return containerFactory;
    }

Copy the code

Wang: Oh, I see. To ensure high availability and confirmation consumption of messages, these three guidelines need to be followed. 3.3.1 Is the sending confirmation mechanism of the producer; The code in 3.3.2 tells us to set persistence mode when creating queue and switch messages. 3.3.3 is the ACK mechanism for consumers to confirm consumption. What about the send and receive codes for RabbitMQ?

We have configured RabbitMQ queues, switches and routes, and to ensure high availability and confirmation consumption we follow three rules: producer send confirmation; Set persistence mode when creating queue and switch messages. Consumer confirmation ACK mechanism. We solved the big difficulty, send, receive code implementation is simple.

3.4 RabbitMQ send and receive combat

3.4.1. Define producers

@Component
@Slf4j
public class BasicPublisher {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    Environment env;

    // Send a string message
    public void sendMsg(String messageStr) {
        if(! Strings.isNullOrEmpty(messageStr)) {try {
                rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
                rabbitTemplate.setExchange(env.getProperty("mq.basic.info.exchange.name"));
                rabbitTemplate.setRoutingKey(env.getProperty("mq.basic.info.routing.key.name"));

                // 2 Create queues, switches, and messages to set the persistence mode
                // Set the persistence mode for messages
                Message message = MessageBuilder.withBody(messageStr.getBytes("utf-8")).
                        setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
                rabbitTemplate.convertAndSend(message);
                log.info("Basic Message Model - Producer - Sending messages: {}", messageStr);

            } catch (UnsupportedEncodingException e) {
                log.error("Basic Message Model - producer - Sending message exception: {}", messageStr, e.fillInStackTrace()); }}}}Copy the code

3.4.2 Create consumers

@Component
@Slf4j
public class BasicConsumer {

    /** * listens for and consumes messages in the queue */
    @RabbitListener(queues = "${mq.basic.info.queue.name}", containerFactory = "singleListenerContainer")
    public void consumerMsg(@Payload byte[] msg) {
        try {
            String messageStr = new String(msg, "utf-8");
            log.info("Basic Message Model - Consumer - Listening for and consuming messages: {}", messageStr);

        } catch (UnsupportedEncodingException e) {

            log.error("Basic message model - consumer - exception:", e.fillInStackTrace()); }}}Copy the code

3.4.3 Write a unit test, send a string to the queue, the consumer listens and processes

@Slf4j
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class RabbitMQTest {

    @Autowired
    private BasicPublisher basicPublisher;

    // Test the basic message model with the message content as a string
    @Test
    public void testBasicMessageModel(a) {
        String msgStr = "~~~ this is a string message ~~~~"; basicPublisher.sendMsg(msgStr); }}Copy the code

3.4.4 Analysis of unit test results

For the unit tests written, the results are shown below:

As you can see from the figure, the producer sends a message: “~~~ this is a string message ~~~~” to the message queue; The consumer listens and processes the message, and then the consumer prints the message content.

Wang: I see, it’s all new to RabbitMQ for me. Bango, let me make sure I get this straight. First, we started with the decoupling of the team’s business needs, leading to message-oriented middleware. Then the four message queues are compared and RabbitMQ is selected according to the actual situation of the company. Then the producers, messages, switches, routes, queues and consumers are compared to the actual scene of sending and receiving express (qin’s sending express to Wang’s sending express). Finally, we integrated RabbitMQ with the popular SpringBoot, followed three guidelines to ensure high availability and confirmation consumption of RabbitMQ, and implemented RabbitMQ to send and receive messages.

Yes, it looks like you’ve got the basics of RabbitMQ, RabbitMQ sending and receiving messages. The link to this project is in the cloud repository, and you can use it directly when you introduce RabbitMQ to your team. The link is gitee.com/qinstudy/sp…

Wang: reading last night, RabbitMQ and the concept of dead letter queue, can you solve the puzzle?

Big bang: to the meal point, or let’s have lunch first, eat while chatting…..