What is a delay queue
The delay queue stores the corresponding delayed message. The so-called “delayed message” means that after the message is sent, consumers do not want to get the message immediately, but wait for a specified time before they get the message for consumption.
How does RabbitMQ implement delay queuing
The AMQP protocol and RabbitMQ themselves do not support delay queuing directly, but it can be simulated using DLX and TTL.
Implementing delay queuing
You need to create two queues, one for sending messages and one for forwarding the message after the message expires.
The producer sends a message to Queue1 with TTL set, such as 60s. The message waits 60 seconds in Queue1, and if no consumer consumes it, it is forwarded to Queue2, which has bound consumers and handles delayed messages.
Note :Queue1 has no consumers
name: ttl-queue
host: 47.105198.54.
port: 5672
virtual-host: /test-1
username: 11
password: 111
queue: prod_queue_pay
dlQueue: dl-queue
name: exchang_prod_pay
dlTopicExchange: dl-topic-exchange
type: topic
key: prod_pay
dlRoutingKey: dl-routing-key
Create a service queue and a dead letter queue
public class RabbitConfig {
// Service queue configuration
private String queueName;
private String exchangeMame;
private String key;
// Dead letter queue configuration
private String dlTopicExchange;
private String dlRoutingKey;
private String dlQueue;
// Create a dead-letter switch
public TopicExchange dlTopicExchange(a){
return new TopicExchange(dlTopicExchange,true.false);
Create a dead letter queue
public Queue dlQueue(a){
return new Queue(dlQueue,true);
// The dead letter queue is bound to the dead letter switch
public Binding BindingErrorQueueAndExchange(Queue dlQueue, TopicExchange dlTopicExchange){
return BindingBuilder.bind(dlQueue).to(dlTopicExchange).with(dlRoutingKey);
private final String dle = "x-dead-letter-exchange";
private final String dlk = "x-dead-letter-routing-key";
private final String ttl = "x-message-ttl";
// Create a service queue
public Queue payQueue(a){
Map<String,Object> params = new HashMap<>();
// Set the queue expiration time to 10s
// Declare the dead-letter switch to which the current queue is bound
// Declare the dead-letter routing key of the current queue
return QueueBuilder.durable(queueName).withArguments(params).build();
// Create service switches
public TopicExchange payTopicExchange(a){
return new TopicExchange(exchangeMame,true.false);
// The service queue is bound to the service switch
public Binding BindingPayQueueAndPayTopicExchange(Queue payQueue, TopicExchange payTopicExchange){
returnBindingBuilder.bind(payQueue).to(payTopicExchange).with(key); }}Copy the code
Creating a producer
/* * the producer */
public class RabbitSender {
private String exchangeName;
private String key;
private RabbitTemplate rabbitTemplate;
public void send(String msg){"RabbitSender.send() msg = {}",msg); rabbitTemplate.convertAndSend(exchangeName,key,msg); }}
Create consumers
The consumer is a message in the consumer dead-letter queue
/** * consumer */
public class RabbitReceiver {
// Consume dead-letter queue messages
@RabbitListener(queues = "${mq.queueBinding.dlQueue}")
// Consume dead-letter queue messages
@RabbitListener(queues = "${mq.queueBinding.dlQueue}")
public void infoConsumption(String data) throws Exception {"Received message :{}",data);"Then a series of logical processes Thanks - (· ω ·) Blue"); }}
External interface
public class TestController {
private RabbitSender rabbitSender;
public void test(@RequestParam String msg){ rabbitSender.send(msg); }}
After starting the service, you can see the switches and queues created
Then make the interface call:http://localhost:8080/?msg= hahahahahaha
2020-11-15 00:33:02.991Rabbitsender.send () MSG = hahaha hahaha2020-11-15 00:33:12.060Received message: hahaha hahaha2020-11-15 00:33:12.060Then a series of logical processes are performed Thanks (· ω ·) Blue
We set the expiration time to 10s; The message expired and became dead letter. Forwarding from prod_queue_pay queue to DL-queue queue. It feels like it worked out perfectly.
One problem we will find is that if I now have different scenarios, such as delayed consumption after 5s, 10s, and 15s, I need to create three queues. Every time a request comes in for a different time period, I need to create a queue, which is definitely not going to work.
The RabbitMQ plug-in implements delay queuing
If you have a problem, solve it! Start by installing a plug-in.
Quick entry:…
Create queues and switches
public class RabbitConfig2 {
private static final String EXCHANGE_NAME = "delayed_exchange";
private static final String QUEUE_NAME = "delayed_queue";
private static final String ROUTE_KEY = "delayed_key";
/** * switch */
CustomExchange exchange(a) {
// Set fanout /direct/topic/header with the x-delayed-type parameter
Map<String, Object> args = new HashMap<>();
return new CustomExchange(EXCHANGE_NAME, "x-delayed-message".true.false,args);
/** * queue */
public Queue queue(a) {
return new Queue(QUEUE_NAME,true.false.false);
/** * bind queues to switches */
public Binding binding(CustomExchange exchange,Queue queue) {
returnBindingBuilder .bind(queue) .to(exchange) .with(ROUTE_KEY) .noargs(); }}Copy the code
Creating a producer
Add the x-delay parameter to the header to control the delay of sending messages.
/* * the producer */
public class RabbitSender {
private static final String ROUTE_KEY = "delayed_key";
private static final String EXCHANGE_NAME = "delayed_exchange";
/ * * *@paramMSG message *@paramDelay Indicates the delay time, in seconds */
public void send2(String msg,int delay){"RabbitSender.send() msg = {}",msg);
rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTE_KEY, msg, message ->{
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); // Message persistence
message.getMessageProperties().setDelay(delay * 1000); // The unit is milliseconds
returnmessage; }); }
// setDelay method source implementation
public void setDelay(Integer delay) {
if(delay ! =null && delay >= 0) {
this.headers.put("x-delay", delay);
} else {
this.headers.remove("x-delay"); }}
Create consumers
/** * consumer */
public class RabbitReceiver {
@RabbitListener(queues = "delayed_queue")
@RabbitListener(queues = "delayed_queue")
public void infoConsumption(String data) throws Exception {"Received message :{}",data);"Then a series of logical processes Thanks - (· ω ·) Blue"); }}
Provide external methods
public class TestController {
private RabbitSender rabbitSender;
public void test2(@PathVariable("msg") String msg, @PathVariable("delay")int delay){ rabbitSender.send2(msg,delay); }}
Start the service and log in to the RabbitMQ management interface. You can see that the switch and queue have been created successfully.
Then the messages with delay consumption time of 60s, 30s, and 5s are sent respectively. Check consumer spending records.
Request 1: http://localhost:8080/test2/msg= send time for 60 s expiration time / 60
Request 2: http://localhost:8080/test2/msg= send time for 30 s expiration time / 30
Request 3: http://localhost:8080/test2/msg= send time to expiration time of 5 s / 5
You can find that logs are sent in sequence: 60s, 30s, and 5s. However, the order of consumption is 5s, 30s, and 60s.
2020-11-15 16:13:58.783Rabbitsender. send() MSG = MSG = Expiration time of 60 seconds2020-11-15 16:14:02.653Rabbitsender. send() MSG = MSG = Expiration time when the sending time is 30 seconds2020-11-15 16:14:08.880Rabbitsender. send() MSG = MSG = Expiration time when the sending time is 5s2020-11-15 16:14:13.924Received message: MSG = Expiration time of 5s2020-11-15 16:14:13.925Then a series of logical processes are performed Thanks (· ω ·) Blue2020-11-15 16:14:32.685Received message: MSG = Expiration time of 30 seconds2020-11-15 16:14:32.687Then a series of logical processes are performed Thanks (· ω ·) Blue2020-11-15 16:14:58.814Received message: MSG = Expiration time of 60 seconds2020-11-15 16:14:58.814Then a series of logical processes are performed Thanks (· ω ·) Blue
