【MQ series 】RabbitListener consumption basic usage posture introduction

The rabbitMQ message sending position has been introduced in the past, so there must be a consumer, in the SpringBoot environment, consumption can be said to be relatively easy, with the @RabbitListener annotation, you can basically meet 90% of your business development needs

Let’s take a look at the most common positions @rabbitListener is used in

I. the configuration

Start by creating a SpringBoot project for subsequent demonstrations

  • Springboot version for2.2.1. RELEASE
  • The rabbitmq version for3.7.5(For installation tutorials, please refer to:【MQ series 】 Springboot + RabbitMQ)

Depend on the configuration file pom.xml

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.2.1. RELEASE</version>
    <relativePath/> <! -- lookup parent from repository -->
</parent>

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <! -- Note that the following is not necessary.
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
</dependencies>

<build>
    <pluginManagement>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </pluginManagement>
</build>
<repositories>
    <repository>
        <id>spring-snapshots</id>
        <name>Spring Snapshots</name>
        <url>https://repo.spring.io/libs-snapshot-local</url>
        <snapshots>
            <enabled>true</enabled>
        </snapshots>
    </repository>
    <repository>
        <id>spring-milestones</id>
        <name>Spring Milestones</name>
        <url>https://repo.spring.io/libs-milestone-local</url>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
    </repository>
    <repository>
        <id>spring-releases</id>
        <name>Spring Releases</name>
        <url>https://repo.spring.io/libs-release-local</url>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
    </repository>
</repositories>
Copy the code

Add rabbitMQ properties to the application.yml configuration file

spring:
  rabbitmq:
    virtual-host: /
    username: admin
    password: admin
    port: 5672
    host: 127.0. 01.
Copy the code

II. Consumption posture

The aim of this article is to be practical and to demonstrate the @RabbitListener position in a specific scenario, so if you find that some of the properties in this annotation are still not understood by the end of this article, please don’t worry, the next article will cover them all

0. The mock data

Consumption consumption, how can you consume without data? So our first step is to create a message producer that can write data to Exchange for subsequent consumer testing

The consumption of this paper is mainly explained in topic mode (there is little difference in the usage of the other several modes, if there is demand, it will be made up later).

@RestController
public class PublishRest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping(path = "publish")
    public boolean publish(String exchange, String routing, String data) {
        rabbitTemplate.convertAndSend(exchange, routing, data);
        return true; }}Copy the code

Provides a simple REST interface to specify which exchange to push data to and specify routing keys

1. Case1: The exchange queue exists

There is no need for the consumer to manage the creation/destruction of an exchange, it is defined by the sender; In general, consumers are more concerned with their queues, including defining them and binding them to exchange, which can be done directly from the RabbitMQ console

Therefore, it is highly likely that the exchange and queue and their corresponding bindings already exist during actual development and do not require additional processing in the code;

Consuming data in this scenario is very, very simple, as follows:

/** * consume ** by specifying the queue name when the queue already exists@param data
 */
@RabbitListener(queues = "topic.a")
public void consumerExistsQueue(String data) {
    System.out.println("consumerExistsQueue: " + data);
}
Copy the code

Just specify the queues argument in the annotation, with the value being the column name (queueName)

2. Case2: Queue does not exist

When the autoDelete property of the queue is false, the above scenario is appropriate; However, when this attribute is true, no consumer queues are automatically deleted, and the following exception may be obtained by using the above gesture

In this scenario, we are required to actively create a Queue and bind it to an Exchange. The following positions are recommended for @RabbitListener

/** * if the queue does not exist, create a queue and bind it to Exchange */
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "topic.n1", durable = "false", autoDelete = "true"),
        exchange = @Exchange(value = "topic.e", type = ExchangeTypes.TOPIC),
        key = "r"))
public void consumerNoQueue(String data) {
    System.out.println("consumerNoQueue: " + data);
}
Copy the code

An annotation that internally declares the queue and establishes the binding relationship is amazing!!

Note the three attributes of the @QueueBinding annotation:

  • Value: @Queue, used to declare queues. Value: queueName, durable: whether queues are persistent, autoDelete: whether queues are automatically deleted when there are no consumers
  • Exchange: @exchange annotation, used to declare exchange, type specifies the message delivery policy, we use topic here
  • Key: In topic mode, this is known as a routingKey

The above, even when the queue does not exist, does not seem complicated

3. case3: ack

In rabbitMQ, there is a message confirmation mechanism to ensure data consistency.

The ACK here is mainly for the consumer. If we want to change the default ACK mode (noACK, Auto, manual), we can do as follows

/** * Requires manual ack, but does not ack **@param data
 */
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "topic.n2", durable = "false", autoDelete = "true"),
        exchange = @Exchange(value = "topic.e", type = ExchangeTypes.TOPIC), key = "r"), ackMode = "MANUAL")
public void consumerNoAck(String data) {
    // Request manual ACK, no ACK, what happens?
    System.out.println("consumerNoAck: " + data);
}
Copy the code

The above implementation is also relatively simple, set ackMode=MANUAL, MANUAL ACK

However, please note that there is no manual ACK anywhere in our implementation, which means that there is no ACK at all. In the subsequent test, it can be seen that the data is always in the column of UNacked. When the number of UNacked exceeds the limit, no new data will be consumed

4. case4: manual ack

Although the above choice of ACK mode, but there is still a step ack logic, next we look at how to fill

/** * Manual ack **@param data
 * @param deliveryTag
 * @param channel
 * @throws IOException
 */
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "topic.n3", durable = "false", autoDelete = "true"),
        exchange = @Exchange(value = "topic.e", type = ExchangeTypes.TOPIC), key = "r"), ackMode = "MANUAL")
public void consumerDoAck(String data, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel)
        throws IOException {
    System.out.println("consumerDoAck: " + data);

    if (data.contains("success")) {
        // In RabbitMQ's ack mechanism, the second argument returns true, indicating that the message needs to be sent to another consumer for re-consumption
        channel.basicAck(deliveryTag, false);
    } else {
        // The third argument, true, indicates that the message will be re-queued
        channel.basicNack(deliveryTag, false.true); }}Copy the code

Notice that the method has two extra parameters

  • deliveryTag: is a unique identifier for a message that MQ uses to determine which message was ack/ NAK
  • channel: pipe between MQ and consumer through which ack/ NAK is passed

When we consume correctly, we call the basicAck method

// In RabbitMQ's ack mechanism, the second argument returns true, indicating that the message needs to be sent to another consumer for re-consumption
channel.basicAck(deliveryTag, false);
Copy the code

We can use basicNack when we fail to consume and need to re-queue the message for re-consumption

// The third argument, true, indicates that the message will be re-queued
channel.basicNack(deliveryTag, false.true);
Copy the code

Case5: Concurrent consumption

When there’s a lot of news, and one consumer is huffing and puffing too slowly, but my machine is performing well, I want parallel consumption, which is the equivalent of having multiple consumers processing the data at the same time

To support parallel consumption, set up the following

@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "topic.n4", durable = "false", autoDelete = "true"),
        exchange = @Exchange(value = "topic.e", type = ExchangeTypes.TOPIC), key = "r"), concurrency = "4")
public void multiConsumer(String data) {
    System.out.println("multiConsumer: " + data);
}
Copy the code

Note the concurrency = “4” property in the annotation, which means fixed four consumers;

In addition to the above assignment, there is an M-N format that represents m parallel consumers, up to N

(additional explanation: this parameter’s explanation was SimpleMessageListenerContainer scenarios, the next article will introduce the difference between it and DirectMessageListenerContainer)

Test 6.

Through the message sending reserved in front of the interface, we in the browser requests: http://localhost:8080/publish? exchange=topic.e&routing=r&data=wahaha

And then if you look at the output, all five consumers are receiving, especially the active NAK consumer, is receiving messages all the time;

(Since logs are always printed, restart the application to start the next test)

And then sends a message of success, verify the correct manually ack, whether there will be the above situation, request command: http://localhost:8080/publish? exchange=topic.e&routing=r&data=successMsg

And then let’s look at the queue that doesn’t have an ACK, there’s always an UNack message

II. The other

Series of blog posts

  • 【MQ series 】 Springboot + RabbitMQ
  • RabbitMq Series RabbitMq
  • 【MQ series 】SprigBoot + RabbitMq send message basic position
  • MQ Series The use of the RabbitMq message confirmation/transaction mechanism

Program source code

  • Project: github.com/liuyueyi/sp…
  • Source: github.com/liuyueyi/sp…

1. A gray Blog:liuyueyi.github.io/hexblog

A gray personal blog, recording all the study and work in the blog, welcome everyone to go to stroll

2. Statement

As far as the letter is not as good, the above content is purely one’s opinion, due to the limited personal ability, it is inevitable that there are omissions and mistakes, if you find bugs or have better suggestions, welcome criticism and correction, don’t hesitate to appreciate

  • Micro Blog address: Small Gray Blog
  • QQ: a gray /3302797840

3. Scan attention

A gray blog