RabbitMQ implements delay queuing
The background,
Recently developed a campaign feature that rewards Top10 users at the end of the campaign. Since the list of events is ranked asynchronously by RabbitMQ, there may be incomplete messages in the queue at the end of the event and the ranking will be inaccurate, which will inevitably lead to errors in awarding rewards.
So, what about solving this problem? Negotiate with the product manager to allow a 10-minute delay in awarding rewards. There are currently two options:
Use timers: Determine the time difference between the current time and the end of the activity and issue a reward if it is >= 10 minutes - Cons: In addition to scheduling the thread, you also need to periodically access the database to get the end of the activity, which is wasteful and inelegant to use RabbitMQ delay queues - Advantages: It can not only meet the requirements but also avoid the shortcomings of the timer implementation schemeCopy the code
Therefore, the implementation of RabbitMQ delay queue is finally selected. However, RabbitMQ does not provide delay queues directly. Read on.
Second, the TTL
TTL is the full description of Time To Live (Live Time/Expiration Time). When a message reaches the Live Time and has not been consumed, it is automatically cleared.
RabbitMQ provides two ways to set an expiration date for messages:
Set the queue expiration time parameter: x-message-TTL, in ms(ms). The message expiration time method is invoked for the entire queue message expiration, and the expiration time is set as expiration time. Unit: ms(ms). When the message is in the head of the queue (consumed), it is individually determined whether the message is expired or not. If both are set, whichever is shorterCopy the code
TTL of actual combat
@Bean
public Queue ttlQueue(a) {
Map<String,Object> map = new HashMap<>(1);
// Set the queue expiration parameter
map.put("x-message-ttl".10000);
return new Queue("ttl.queue".true.false.false, map);
}
@Bean
public DirectExchange ttlDirectExchange(a) {
return new DirectExchange("ttl.direct.exchange".true.false);
}
@Bean
public Binding ttlDirectBinding(a) {
return BindingBuilder
.bind(ttlQueue())
.to(ttlDirectExchange())
.with("ttl");
}
Copy the code
The test class:
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqTestApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test(a) throws Exception {
for (int i = 0; i < 10; i++) {
this.rabbitTemplate.convertAndSend("ttl.direct.exchange"."ttl"."hello ttl".new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// Call the message expiration method
message.getMessageProperties().setExpiration("5000");
returnmessage; }}); }}}Copy the code
The test results are shown below:
Ttl. queue After the queue is created, you can see that the TTL attribute is 10000 (10 seconds) and the 10 messages in the queue are cleared after 5 seconds.
Dead letter queues
A Dead Letter Exchange (DLX), when a Message becomes a Dead Message, can be re-sent to another switch. This is a Dead Letter switch and since the switch is unique to RabbitMQ, Usually we call a dead letter switch a dead letter queue.
The schematic diagram is as follows:
To implement the above process, we need to solve two problems:
How a message in a queue becomes dead letter: -- The message length of the original queue reaches the limit -- the message expiration setting exists in the original queue, and the message arrival timeout time is not consumed -- the consumer rejects the consumption message, BasicNack /basicReject, and does not put the message back into the original destination queue,requeue=false X - dead - letter - exchange and x - dead - letter - routing - keyCopy the code
Dead letter queue actual combat
@Bean public Queue ttlQueue() { Map<String,Object> map = new HashMap<>(1); map.put("x-message-ttl", 10000); Map. put("x-dead-letter-exchange"," dead-.direct.exchange "); map.put("x-dead-letter-routing-key", "dead"); return new Queue("ttl.queue", true, false, false, map); } @Bean public DirectExchange ttlDirectExchange() { return new DirectExchange("ttl.direct.exchange", true, false); } @Bean public Binding ttlDirectBinding() { return BindingBuilder .bind(ttlQueue()) .to(ttlDirectExchange()) .with("ttl"); } / / = = = = = = = = = = = = = = = = = = = = = = = the following is a dead-letter Queue associated configuration = = = = = = = = = = = = = = = = = = = = = = = = = @ Bean public Queue deadQueue () {return new Queue("dead.queue", true); } @Bean public DirectExchange deadDirectExchange() { return new DirectExchange("dead.direct.exchange", true, false); } @Bean public Binding deadDirectBinding() { return BindingBuilder .bind(deadQueue()) .to(deadDirectExchange()) .with("dead"); }Copy the code
The test class code remains the same, but for the sake of testing, we will not write consumer code here. We need to delete TTL. Queue before executing the code. The results are shown below:
The message in TTL. Queue is transferred to dead. Queue after 5 seconds.
We apply the functionality we need to implement to the above example: after the activity ends we send a message with an expiration time (10 minutes) to the ttl.queue queue, which does not require consumers. 10 minutes later, because the message is not consumed, it is forwarded to dead-letter queue dead. Queue, dead.
Iv. Reference materials
Official Document TTL
Official documentation DLX