Those of you who have studied RabbitMQ will know that rabbitMQ does not have a delay queue, so why would an interviewer ask such a bizarre question? Because when the interviewer asks you this question, they are testing your ability to integrate knowledge logically.
I can say for sure that RabbitMQ does not implement delay queueing, but we can save the country by using dead letter queueing and TTL.
Another way to do this is through the delay queue plug-in, which I’ll cover later.
Application scenario of delay queue
The most commonly used place is the cancellation of an order over time
Before we talk about how to do this, let’s talk about dead letter queues and TTL:
Key points
Dead-letter queue
Dead letter queues are actually called dead letter switches in RabbitMQ, so what does this dead letter actually mean? A dead letter queue is no different from a normal queue, but users do not actively send messages to the queue or the switch. Messages are forwarded from the original queue to the dead letter queue only when:
- The message length of the original queue exceeded a predetermined limit
- The consumer rejects the message,basicNack/basicReject, and does not put the message back into the queue
- The message in the original queue is set to expire. If it has not been consumed by consumers before expiration, it will also be forwarded to the dead letter queue.
Dead-letter queue-related Settings are set in queues:“x-dead-letter-exchange“.“x-dead-letter-routing-key“
TTL
TTL (Time To Live). If a message has not been consumed before it has lived To the specified Time, the message will be cleared. Rabbit can set an expiration message for a queue or for a specific message.
Q: How does Rabbit handle messages with expiration dates?
A: Rabbit implements a lazy policy of clearing expiration times in order to ensure high throughput on message queues; This lazy strategy is through after the news reached the top of the queue, the broker will check whether the queue is set the expiration time, if the set up the check expiration time has arrived, if the message, don’t push this message, don’t answer, the broker will traverse each message, check the expiration date, remember remember!!!!!!
❝
Now that we’ve covered two important technical points, it’s time to move on to the topic of this article. How does RabbitMQ implement delayed queuing?
❞
Using TTL + DLX
Implementation approach
You probably already know how to implement it, but even if you do, I’m still going to tell you, haha
Because TTL allows you to set an expiration time for a message, one of the conditions for entering a dead-letter queue is: The original queue the message set expiration time, if prior to the expiration, hasn’t been consumer spending, so will be transferred to a dead-letter queue, then we can combine the two to do so, deal with normal business listener to listen the dead-letter queue, and then to the normal parameter under the dead-letter queue queue, then the message flow will be like this:
- I sent a message to the broker with an expiration time set to 10000 milliseconds
- The broker places messages in queues
- After 10,000 milliseconds, the message has not been consumed
- The broker forwards the message to a dead letter switch, which pushes the message to a dead letter queue
- I’ve already set up a listener to listen to the dead-letter queue, so I will receive this message in 10000 milliseconds;
The code
- Producer queues bind to switches and queue declarations
@Configuration
public class RabbitMQConfig {
public static final String QUEUE_TEST_DLX = "queue_test_dlx";
public static final String QUEUE_TEST_NORMAL = "queue_test_normal";
public static final String EXCHANGE_TEST_DLX = "exchange_test_dlx";
// Declare a queue that does not consume by default and bind the key of the dead-letter queue switch to the dead-letter queue
@Bean("queueTestNormal")
public Queue queueTestNormal() {
return QueueBuilder.durable(QUEUE_TEST_NORMAL).deadLetterExchange(EXCHANGE_TEST_DLX).deadLetterRoutingKey("testdlx").build();
}
// Declare a dead-letter queue
@Bean("queueTestDLX")
public Queue queueTestDLX() {
return QueueBuilder.durable(QUEUE_TEST_DLX).build();
}
// Declare a dead letter switch
@Bean("exchangeTestDLX")
public Exchange exchangeTestDLX() {
return ExchangeBuilder.directExchange(EXCHANGE_TEST_DLX).durable(true).build();
}
// The dead letter queue is bound to the dead letter switch
@Bean
public Binding itemQueueExchange7(@Qualifier("queueTestDLX") Queue queue,
@Qualifier("exchangeTestDLX") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("testdlx").noargs();
}
}
Copy the code
- The producer uses a simple message to send to the normal queue and sets the expiration time to 10s
@Test
public void testDLX() {
rabbitTemplate.convertAndSend(null, "queue_test_normal"."I didn't get there until 10 seconds later.", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
MessageProperties messageProperties = message.getMessageProperties();
messageProperties.setExpiration(10000+"");
return message;
}
});
System.out.println("I sent the message at :"+(System.currentTimeMillis()));
System.out.println("Start countdown :10");
int i = 10;
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if(i>0){
System.out.println("Countdown:"+(--i));
}
}
}
Copy the code
- Consumer listener written
@RabbitListener(queues = "queue_test_dlx")
public void onMessage5(Message message, Channel channel) throws Exception {
System.out.println("I received the message at:"+(System.currentTimeMillis()));
System.out.println("Received message from queue queue_test_dlx:" + new String(message.getBody()));
}
Copy the code
conclusion
So far. Delay queue has been implemented, we now summarize the only disadvantage of this way to achieve delay queue:
“No timely”, because only the news reached the top of the queue, the broker to check whether the news is overdue, push, added to the set expiration time the message there is a set up for a longer time expiration time in front of the message, this will cause the expiration time small message has not been disposed of, has been in the queue waiting for;
For this reason, the rabbitmq introduces a delay queue plugin. This plugin in front of the implementation of the idea and implementation in a different way, when a message delay time is set up, it’s not the news immediately pushed to the queue, but a message such as a set of delay time in the queue, only after we introduce delay queue plugin now is how to implement:
Use the delay queue plug-in
Install the delay queue plug-in
# download plug-in https://www.cnblogs.com/geekdc/p/13549613.html
Docker cp/Users/yangle docker/rabbitmq/plugins/rabbitmq_delayed_message_exchange 3.8.9-0199 d11c. Ez rabbitmq: / plugins
# Enter container
docker exec -it rabbitmq /bin/bash
# enable plugin
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# check
rabbitmq-plugins list
Restart the container
docker restart rabbitmq
Copy the code
The code
- The switch is bound to the queue profile
@Configuration
public class RabbitMQConfig {
public static final String QUEUE_TEST_DELAY_PLUGIN = "queue_test_delay_plugin";
public static final String EXCHANGE_TEST_DELAY_PLUGIN = "exchange_test_delay_plugin";
// Declare a queue
@Bean("queueDelayPlugin")
public Queue queueDelayPlugin() {
return QueueBuilder.durable(QUEUE_TEST_DELAY_PLUGIN).build();
}
@Bean
CustomExchange delayExchange() {
Map<String, Object> args = new HashMap<>();
// Set to routing mode
args.put("x-delayed-type"."direct");
// typeIt must be set to X-delayed -message
return new CustomExchange(EXCHANGE_TEST_DELAY_PLUGIN, "x-delayed-message".true.false, args);
}
// Plug-in switches are bound to queues
@Bean
public Binding itemQueueExchange8(@Qualifier("queueDelayPlugin") Queue queue,
@Qualifier("delayExchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("testDelayPlugin").noargs();
}}
Copy the code
- Send a message
@Test
public void testDelayPlugin() {
rabbitTemplate.convertAndSend("exchange_test_delay_plugin"."testDelayPlugin"."Test delay plug-in sending messages", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDelay(10000);
return message;
}
});
}
Copy the code
- The listener
@RabbitListener(queues = "queue_test_delay_plugin")
public void onMessage6(Message message, Channel channel) throws Exception {
System.out.println("I received the message at:"+(System.currentTimeMillis()));
System.out.println("Received message from queue queue_test_delay_plugin:" + new String(message.getBody()));
}
Copy the code
conclusion
While the plugin’s approach to delay queuing is simple, it has its limitations:
- Can degrade performance, so do not use it if it is not required.
- The plug-in is not suitable for delayed messages with large data volumes, such as million or 100 million.
- Delay time: 0<= N <=(2^32)-1, in milliseconds.
There will be a large number of interview materials and architect must-see books waiting for you to choose, including Java foundation, Java concurrency, micro services, middleware and more information waiting for you to take oh.