【MQ series 】RabbitListener consumption basic usage posture introduction
The rabbitMQ message sending position has been introduced in the past, so there must be a consumer, in the SpringBoot environment, consumption can be said to be relatively easy, with the @RabbitListener annotation, you can basically meet 90% of your business development needs
Let’s take a look at the most common positions @rabbitListener is used in
I. the configuration
Start by creating a SpringBoot project for subsequent demonstrations
- Springboot version for
2.2.1. RELEASE
- The rabbitmq version for
3.7.5
(For installation tutorials, please refer to:【MQ series 】 Springboot + RabbitMQ)
Depend on the configuration file pom.xml
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.1. RELEASE</version>
<relativePath/> <! -- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<! -- Note that the following is not necessary.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</pluginManagement>
</build>
<repositories>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/libs-snapshot-local</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/libs-milestone-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>spring-releases</id>
<name>Spring Releases</name>
<url>https://repo.spring.io/libs-release-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
Copy the code
Add rabbitMQ properties to the application.yml configuration file
spring:
rabbitmq:
virtual-host: /
username: admin
password: admin
port: 5672
host: 127.0. 01.
Copy the code
II. Consumption posture
The aim of this article is to be practical and to demonstrate the @RabbitListener position in a specific scenario, so if you find that some of the properties in this annotation are still not understood by the end of this article, please don’t worry, the next article will cover them all
0. The mock data
Consumption consumption, how can you consume without data? So our first step is to create a message producer that can write data to Exchange for subsequent consumer testing
The consumption of this paper is mainly explained in topic mode (there is little difference in the usage of the other several modes, if there is demand, it will be made up later).
@RestController
public class PublishRest {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping(path = "publish")
public boolean publish(String exchange, String routing, String data) {
rabbitTemplate.convertAndSend(exchange, routing, data);
return true; }}Copy the code
Provides a simple REST interface to specify which exchange to push data to and specify routing keys
1. Case1: The exchange queue exists
There is no need for the consumer to manage the creation/destruction of an exchange, it is defined by the sender; In general, consumers are more concerned with their queues, including defining them and binding them to exchange, which can be done directly from the RabbitMQ console
Therefore, it is highly likely that the exchange and queue and their corresponding bindings already exist during actual development and do not require additional processing in the code;
Consuming data in this scenario is very, very simple, as follows:
/** * consume ** by specifying the queue name when the queue already exists@param data
*/
@RabbitListener(queues = "topic.a")
public void consumerExistsQueue(String data) {
System.out.println("consumerExistsQueue: " + data);
}
Copy the code
Just specify the queues argument in the annotation, with the value being the column name (queueName)
2. Case2: Queue does not exist
When the autoDelete property of the queue is false, the above scenario is appropriate; However, when this attribute is true, no consumer queues are automatically deleted, and the following exception may be obtained by using the above gesture
In this scenario, we are required to actively create a Queue and bind it to an Exchange. The following positions are recommended for @RabbitListener
/** * if the queue does not exist, create a queue and bind it to Exchange */
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "topic.n1", durable = "false", autoDelete = "true"),
exchange = @Exchange(value = "topic.e", type = ExchangeTypes.TOPIC),
key = "r"))
public void consumerNoQueue(String data) {
System.out.println("consumerNoQueue: " + data);
}
Copy the code
An annotation that internally declares the queue and establishes the binding relationship is amazing!!
Note the three attributes of the @QueueBinding annotation:
- Value: @Queue, used to declare queues. Value: queueName, durable: whether queues are persistent, autoDelete: whether queues are automatically deleted when there are no consumers
- Exchange: @exchange annotation, used to declare exchange, type specifies the message delivery policy, we use topic here
- Key: In topic mode, this is known as a routingKey
The above, even when the queue does not exist, does not seem complicated
3. case3: ack
In rabbitMQ, there is a message confirmation mechanism to ensure data consistency.
The ACK here is mainly for the consumer. If we want to change the default ACK mode (noACK, Auto, manual), we can do as follows
/** * Requires manual ack, but does not ack **@param data
*/
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "topic.n2", durable = "false", autoDelete = "true"),
exchange = @Exchange(value = "topic.e", type = ExchangeTypes.TOPIC), key = "r"), ackMode = "MANUAL")
public void consumerNoAck(String data) {
// Request manual ACK, no ACK, what happens?
System.out.println("consumerNoAck: " + data);
}
Copy the code
The above implementation is also relatively simple, set ackMode=MANUAL, MANUAL ACK
However, please note that there is no manual ACK anywhere in our implementation, which means that there is no ACK at all. In the subsequent test, it can be seen that the data is always in the column of UNacked. When the number of UNacked exceeds the limit, no new data will be consumed
4. case4: manual ack
Although the above choice of ACK mode, but there is still a step ack logic, next we look at how to fill
/** * Manual ack **@param data
* @param deliveryTag
* @param channel
* @throws IOException
*/
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "topic.n3", durable = "false", autoDelete = "true"),
exchange = @Exchange(value = "topic.e", type = ExchangeTypes.TOPIC), key = "r"), ackMode = "MANUAL")
public void consumerDoAck(String data, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel)
throws IOException {
System.out.println("consumerDoAck: " + data);
if (data.contains("success")) {
// In RabbitMQ's ack mechanism, the second argument returns true, indicating that the message needs to be sent to another consumer for re-consumption
channel.basicAck(deliveryTag, false);
} else {
// The third argument, true, indicates that the message will be re-queued
channel.basicNack(deliveryTag, false.true); }}Copy the code
Notice that the method has two extra parameters
deliveryTag
: is a unique identifier for a message that MQ uses to determine which message was ack/ NAKchannel
: pipe between MQ and consumer through which ack/ NAK is passed
When we consume correctly, we call the basicAck method
// In RabbitMQ's ack mechanism, the second argument returns true, indicating that the message needs to be sent to another consumer for re-consumption
channel.basicAck(deliveryTag, false);
Copy the code
We can use basicNack when we fail to consume and need to re-queue the message for re-consumption
// The third argument, true, indicates that the message will be re-queued
channel.basicNack(deliveryTag, false.true);
Copy the code
Case5: Concurrent consumption
When there’s a lot of news, and one consumer is huffing and puffing too slowly, but my machine is performing well, I want parallel consumption, which is the equivalent of having multiple consumers processing the data at the same time
To support parallel consumption, set up the following
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "topic.n4", durable = "false", autoDelete = "true"),
exchange = @Exchange(value = "topic.e", type = ExchangeTypes.TOPIC), key = "r"), concurrency = "4")
public void multiConsumer(String data) {
System.out.println("multiConsumer: " + data);
}
Copy the code
Note the concurrency = “4” property in the annotation, which means fixed four consumers;
In addition to the above assignment, there is an M-N format that represents m parallel consumers, up to N
(additional explanation: this parameter’s explanation was SimpleMessageListenerContainer scenarios, the next article will introduce the difference between it and DirectMessageListenerContainer)
Test 6.
Through the message sending reserved in front of the interface, we in the browser requests: http://localhost:8080/publish? exchange=topic.e&routing=r&data=wahaha
And then if you look at the output, all five consumers are receiving, especially the active NAK consumer, is receiving messages all the time;
(Since logs are always printed, restart the application to start the next test)
And then sends a message of success, verify the correct manually ack, whether there will be the above situation, request command: http://localhost:8080/publish? exchange=topic.e&routing=r&data=successMsg
And then let’s look at the queue that doesn’t have an ACK, there’s always an UNack message
II. The other
Series of blog posts
- 【MQ series 】 Springboot + RabbitMQ
- RabbitMq Series RabbitMq
- 【MQ series 】SprigBoot + RabbitMq send message basic position
- MQ Series The use of the RabbitMq message confirmation/transaction mechanism
Program source code
- Project: github.com/liuyueyi/sp…
- Source: github.com/liuyueyi/sp…
1. A gray Blog:liuyueyi.github.io/hexblog
A gray personal blog, recording all the study and work in the blog, welcome everyone to go to stroll
2. Statement
As far as the letter is not as good, the above content is purely one’s opinion, due to the limited personal ability, it is inevitable that there are omissions and mistakes, if you find bugs or have better suggestions, welcome criticism and correction, don’t hesitate to appreciate
- Micro Blog address: Small Gray Blog
- QQ: a gray /3302797840
3. Scan attention
A gray blog