Today we introduce the advanced use of SpringBoot to implement RabbitMQ message queues.
- MQ installation
- Automatically created
- Message retry
- Message timeout
- Dead-letter queue
- Delay queue
Install RabbitMQ
RabbitMQ installation is a complex process that requires installing the Erlang server and rabbitMQ_management server for easy management. In the development test environment using Docker to install much more convenient, save the environment and configuration trouble.
1. Pull the official image
docker pull rabbitmq:management
Copy the code
2. Start the RabbitMQ
docker run -dit --name MyRabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 rabbitmq:management
Copy the code
rabbitmq:management: image:tag
–name: specifies the container name. -d: background running container. -t: specifies a dummy terminal or terminal in the new container. -i: allows you to interact with standard input (STDIN) inside the container. -p: specifies the port on which the service runs. (5672: indicates the application access port. 15672: Console Web port number); -e: Specifies the environment variable. RABBITMQ_DEFAULT_USER: default user name; RABBITMQ_DEFAULT_PASS: password for the default user name);
Now RabbitMQ has been installed and started, you can log in to the admin background via http://localhost:15672 with the same user name and password as admin and admin
Use SpringBoot to automatically create queues
1. Import the AMQP package
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Copy the code
2. The MQ configuration
The bootstrap. Yml configuration
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: /
username: admin
password: admin
listener:
simple:
concurrency: 5
direct:
prefetch: 10
Copy the code
Concurrency: the number of concurrent consumers set per listener at initialization. Prefetch: the number of messages to be consumed at a time from the broker
The rabbitmq – spring XML configuration
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<! -- The name of the queue that receives the message -->
<rabbit:queue name="login-user-logined" />
<! Exchange name and type -->
<rabbit:topic-exchange name="login_barryhome_fun">
<rabbit:bindings>
<! -- Exchange and queue binding -->
<rabbit:binding queue="login-user-logined" pattern="login.user.logined"/>
</rabbit:bindings>
</rabbit:topic-exchange>
</beans>
Copy the code
Rabbit :topic-exchange pattern=”login.user.logined” : this is an expression in which “*” can be used to indicate a word, and “#” can be used to indicate one or more words
3. The message production end
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping("/send")
public LoginUser SendLoginSucceedMessage(a){
LoginUser loginUser = getLoginUser("succeed");
// Send a message
rabbitTemplate.convertAndSend(MessageConstant.MESSAGE_EXCHANGE,
MessageConstant.LOGIN_ROUTING_KEY, loginUser);
return loginUser;
}
@NoArgsConstructor
@AllArgsConstructor
public class LoginUser implements Serializable {
String userName;
String realName;
String userToken;
Date loginTime;
String status;
}
Copy the code
Note that by default, SimpleMessageConverter can parse only string and byte. Therefore, the message object passed must be Serializable, implementing the Serializable interface
SimpleMessageConverter only supports String, byte[] and Serializable payloads, received: fun.barryhome.cloud.dto.LoginUser
Copy the code
4. Message consuming terminal
@Component
public class ReceiverMessage {
@RabbitListener(queues = "login-user-logined")
public void receiveLoginMessage(LoginUser loginUser) { System.err.println(loginUser); }}Copy the code
@rabbitListener (Queues = “login-user-logined”) : Used to listen to messages in the login-user-logined queue
5. Automatically create a Queue
@SpringBootApplication
@ImportResource(value = "classpath:rabbitmq-spring.xml")
public class MQApplication {
public static void main(String[] args) { SpringApplication.run(MQApplication.class, args); }}Copy the code
When no XML is imported and there is no queue on the MQ server, an error occurs that the relevant queue cannot be found
channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'login-user-logined' in vhost '/'.class-id=50, method-id=10)
Copy the code
Exchanges and queues are created automatically after the import
Message retry
By default, if a message consumption error occurs, the message will be retried and blockedAs can be seen in the figureunackedandtotalIt’s always 1, butdeliver/getsoaring
Message congestion also affects the consumption of subsequent messages. As time goes by, more and more messages cannot be consumed and processed in time. If there is a problem with a single or very small number of messages, open concurrency for multiple nodes to block the normal message. If there are too many, all nodes will be blocked.
If you want to encounter message consumption error retry several times to discard, so as not to affect the subsequent message consumption, how to implement?
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: /
username: admin
password: admin
listener:
simple:
concurrency: 5
prefetch: 10
retry:
enabled: true Allow retry of failed message consumption
max-attempts: 3 # Messages can be consumed 3 times at most
initial-interval: 2000 # Message multiple consumption interval 2 seconds
Copy the code
The preceding configuration allows three retries after a message consumption failure, with an interval of two seconds. If the message still fails, the message is discarded. Retry can solve temporary faults caused by non-body processing problems, and discarding the failed message is just for the benefit of normal processing of other messages, reducing the impact of business operations to a relatively low level.
The message timed out
Message retry resolves problems caused by message processing errors. If the message processing is too slow, you can process the message in the processing logic or through the message timeout mechanism. After setting the timeout period, the message is discarded.
Modify the rabbitmq – spring. XML
<rabbit:queue name="login-user-logined">
<rabbit:queue-arguments>
<entry key="x-message-ttl" value="10000" value-type="java.lang.Long" />
</rabbit:queue-arguments>
</rabbit:queue>
Copy the code
X-message-ttl: time spent in the message server (ms)
If a queue exists before the configuration, it cannot be modified. You need to delete the queue and create it automaticallyFeaturesThere areTTLlogo
5. Dead-letter queues
A dead-letter queue is a queue that forwards a message to another queue according to a routingKey when a business queue fails to process:
- Reject or basic.nack) with Requeue =false No rejoin parameter or retry Upper limit of rejoin times
- The TTL(Time To Live) of the message has expired
- Queue length limit exceeded (queue full, “x-max-length” parameter)
1. Modifyrabbitmq-spring.xml
<! -- The name of the queue that receives the message -->
<rabbit:queue name="login-user-logined">
<rabbit:queue-arguments>
<entry key="x-message-ttl" value="10000" value-type="java.lang.Long"/>
<! -- Dead letter switch -->
<entry key="x-dead-letter-exchange" value="login_barryhome_fun"/>
<! -- Dead mail route -->
<entry key="x-dead-letter-routing-key" value="login.user.login.dlq"/>
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:queue name="login-user-logined-dlq"/>
<! Exchange name and type -->
<rabbit:topic-exchange name="login_barryhome_fun">
<rabbit:bindings>
<! -- Exchange and queue binding -->
<rabbit:binding queue="login-user-logined" pattern="login.user.logined"/>
<rabbit:binding queue="login-user-logined-dlq" pattern="login.user.login.dlq"/>
</rabbit:bindings>
</rabbit:topic-exchange>
Copy the code
Messages can be redirected to a queue by setting the switch and route from which they are sent. The switch may be different from the original service queue. If the login-user-logined message fails to be processed, the message is directly forwarded to the login-user-logined- DLQ queue. When the program logic is repaired, move messages can be moved back to the business queue
2. Install the plug-in
The figure prompts you to install the plug-in first
3. Mobile messages
After the installation is successful, you can enter the service queue name and then forward the service queue
Delay queue
In addition to general delay processing, a delay queue can also be treated as a scheduled task of a single job, which is more elegant than polling through timers.
1. Modify the rabbitmq – spring. XML
<rabbit:topic-exchange name="login_barryhome_fun" delayed="true">
Copy the code
If the following error message is displayed during the initial configuration, the server does not support the command and you need to install a plug-in
Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=503, reply-text=COMMAND_INVALID - unknown exchange type 'x-delayed-message'.class-id=40, method-id=10)
Copy the code
2. Install the plug-in
-
Download the plugin: github.com/rabbitmq/ra…
-
Upload plugins to docker containers /plugins
docker ps
Check out the rabbitmqCONTAINER ID
Docker cp rabbitmq_delayed_message_exchange - 3.8.0. Ez 2 c248563a2b0: / pluginsCopy the code
- Inside the Docker container
docker exec -it 2c248563a2b0 /bin/bash
Copy the code
- Installing a plug-in
cd /plugins
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Copy the code
For details, see blog.csdn.net/magic_1024/…
After the installation is successful, restart the program and observe the MQ managerexchangeCan be found
3. Send a delayed message
@GetMapping("/sendDelay")
public LoginUser SendDelayLoginSucceedMessage(a) {
LoginUser loginUser = getLoginUser("succeed");
MessagePostProcessor messagePostProcessor = message -> {
/ / delay 10 s
message.getMessageProperties().setHeader("x-delay".10000);
return message;
};
// Send a message
rabbitTemplate.convertAndSend(MessageConstant.MESSAGE_EXCHANGE,
MessageConstant.LOGIN_ROUTING_KEY, loginUser, messagePostProcessor);
return loginUser;
}
Copy the code
Note that the message is sent in real time. After receiving the message, the message server forwards the message to the corresponding queue
Vii. Complete code
Gitee.com/hypier/barr…