What is a delayed queue

Delay queue: As the name suggests, a queue for message delay consumption. But it is also an ordinary queue, so it has the characteristics of ordinary queues, in contrast, the characteristics of delay is its biggest feature. The delay is the delay in how long we want the message to be consumed. The normal queue is instant consumption, the delay queue is based on the delay time, how long after the consumption.

Application scenario of delay queue

  • Orders that are not paid within 10 minutes are automatically cancelled.
  • Regular push of membership renewal
  • After successful registration, if the user does not log in within three days, SMS reminder will be carried out.
  • After a scheduled meeting, participants must be notified 10 minutes in advance of the scheduled time.
  • Coupon Expiration Reminder

The core app content is basically based on the need to set expiration time

How does RabbitMQ implement delayed queuing

  • Method 1: Use RabbitMQ’s advanced features TTL and matching dead-letter queues
  • Install rabbitmq_delayed_message_exchange

TTL is an advanced feature in RabbitMQ

What is TTL? TTL is the attribute of a message or queue in RabbitMQ, indicating the maximum lifetime of a message or all messages in the queue, in milliseconds. TTL is a message expiration policy. Our message is expired, and when the message has lived in the queue for a specified time, the queue will discard the message. RabbitMQ does not implement delay queues directly, we can use advanced features such as TTL and dead letter queues to implement delay queues.

So, how do you set this TTL value? There are two methods. The first method is to set the X-message-TTL attribute of the queue when creating the queue as follows: Method 1:

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl".6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);
Copy the code

In this way, messages are TTL and are discarded by the queue once they expire

Method 2:

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.expiration("6000");
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(exchangeName, routingKey, mandatory, properties, "msg body".getBytes());
Copy the code

In this way, messages that are out of date are not necessarily discarded immediately because they are determined before they are delivered to the consumer, and expired messages may survive for a longer time if the current queue is heavily backlogged.

It is also important to note that if the TTL is not set, the message will never expire, and if the TTL is set to 0, the message will be discarded unless it can be delivered directly to the consumer.

How does RabbitMQ implement delayed queuing

  • Step 1: Create a normal queue, specify the message expiration time, and specify the dead letter exchange and dead letter exchange queue to be delivered after the message expires.
  • Step 2: Create a dead letter queue and a dead letter switch

RabbitMQ implements a delayed queue instance

package com.example.demo;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

/ * * *@author echo
 * @dateThe 2021-01-14 * / desire
public class TopicDealProductTest {

    /** * delay queue switch */
    private static final String DIRECT_EXCHANGE_DELAY = "dir_exchange_delay";
    /** * Dead letter queue switch */
    private static final String DIRECT_EXCHANGE_DEAD = "dir_exchange_dead";
    /** * delay queue */
    private static final String DIRECT_QUEUE_DELAY = "dir.queue.delay";
    /** * dead letter queue */
    private static final String DIRECT_QUEUE_DEAD = "dir.queue.dead";
    /** * delay queue ROUTING_KEY */
    private static final String DIRECT_DELAY_ROUTING_KEY = "delay.queue.routingKey";
    /** * delay queue ROUTING_KEY */
    private static final String DIRECT_DEAD_ROUTING_KEY = "dead.queue.routingKey";
    private static final String IP_ADDRESS = "192.168.230.131";
    private static final int PORT = 5672;


    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = createConnection();
        // Create a channel
        Channel channel = connection.createChannel();
        sendMsg(channel);
        Thread.sleep(10000);
        closeConnection(connection, channel);
    }

    private static void sendMsg(Channel channel) throws IOException {

        // Create delay queue and delay switch
        channel.exchangeDeclare(DIRECT_EXCHANGE_DELAY, BuiltinExchangeType.DIRECT);
        Map<String, Object> map = new HashMap<>(16);
        // Specify a dead letter switch on the delay switch
        map.put("x-dead-letter-exchange", DIRECT_EXCHANGE_DEAD);
        // Specify the dead-letter queue routing-key on the delay switch
        map.put("x-dead-letter-routing-key", DIRECT_DEAD_ROUTING_KEY);
        // Set the delay queue extension duration to 10s
        map.put("x-message-ttl".10000);
        // Create a delay queue
        channel.queueDeclare(DIRECT_QUEUE_DELAY, true.false.false, map);
        // Bind delay queue on delay switch
        channel.queueBind(DIRECT_QUEUE_DELAY, DIRECT_EXCHANGE_DELAY, DIRECT_DELAY_ROUTING_KEY);

        // Create a dead letter queue and a dead letter switch
        channel.exchangeDeclare(DIRECT_EXCHANGE_DEAD, BuiltinExchangeType.TOPIC, true.false.null);
        Create a dead letter queue
        channel.queueDeclare(DIRECT_QUEUE_DEAD, true.false.false.null);
        // Bind a dead letter queue to a dead letter exchange
        channel.queueBind(DIRECT_QUEUE_DEAD, DIRECT_EXCHANGE_DEAD, DIRECT_DEAD_ROUTING_KEY);

        channel.basicPublish(DIRECT_EXCHANGE_DELAY, DIRECT_DELAY_ROUTING_KEY, null."hello world".getBytes());

    }

    private static void closeConnection(Connection connection, Channel channel) throws IOException, TimeoutException {
        // Close the resource
        channel.close();
        connection.close();
    }

    private static Connection createConnection(a) throws IOException, TimeoutException {
        // Create a connection factory
        ConnectionFactory factory = new ConnectionFactory();
        // Set the link parameters for RabbitMQ
        factory.setHost(IP_ADDRESS);
        factory.setPort(PORT);
        factory.setUsername("echo");
        factory.setPassword("123456");
        // Create a link to RabbitMQ
        returnfactory.newConnection(); }}Copy the code

In fact, it is not difficult to find that we only use the TTL feature, let the message expire when the specified queue, dead letter queue is also a normal queue.

In Exchange, we created two exchanges and two queues, but there are still differences between the two queues and the Exchange. Let’s look at the picture

We can see that the Features flags of the two queues are different

  • TTL: indicates the expiration time of the message in the queue
  • DLX: This queue is bound to a dead-letter exchange
  • DLK: ROUTING_KEY of the dead-letter queue to which the queue is bound

At the end of the execution, we can see that the messages in the delay queue are first delivered to the dead queue after they reach the expiration time.

So TTL and amqP.basicProperties are different. The first one is to set the expiration time of queue messages, and the second one is to set the expiration time of each message. So what’s the difference?

Set the difference between each message and TTL

The difference between the two approaches is how to determine whether the message is to be discarded. TTL queues that discard messages as soon as they reach their expiration date. If the latter, could we have a lot of queue messages, then each message expiration time again, this time, if the queue at the exit blocked many not set expiration time news nor consumed, the back of the queue messages in a timely manner to set the expiration time will not be discarded, only in setting the expiration time the message to the queue of the consumer’s position, Will determine

How to use amQP.basic Properties?

package com.example.demo;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

/ * * *@author echo
 * @dateThe 2021-01-14 * / desire
public class TopicDealProductTest {

    /** * delay queue switch */
    private static final String DIRECT_EXCHANGE_DELAY = "dir_exchange_delay";
    /** * Dead letter queue switch */
    private static final String DIRECT_EXCHANGE_DEAD = "dir_exchange_dead";
    /** * delay queue */
    private static final String DIRECT_QUEUE_DELAY = "dir.queue.delay";
    /** * dead letter queue */
    private static final String DIRECT_QUEUE_DEAD = "dir.queue.dead";
    /** * delay queue ROUTING_KEY */
    private static final String DIRECT_DELAY_ROUTING_KEY = "delay.queue.routingKey";
    /** * delay queue ROUTING_KEY */
    private static final String DIRECT_DEAD_ROUTING_KEY = "dead.queue.routingKey";
    private static final String IP_ADDRESS = "192.168.230.131";
    private static final int PORT = 5672;


    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = createConnection();
        // Create a channel
        Channel channel = connection.createChannel();
        sendMsg(channel);
        Thread.sleep(10000);
        closeConnection(connection, channel);
    }

    private static void sendMsg(Channel channel) throws IOException {

        // Create delay queue and delay switch
        channel.exchangeDeclare(DIRECT_EXCHANGE_DELAY, BuiltinExchangeType.DIRECT);
        Map<String, Object> map = new HashMap<>(16);
        // Specify a dead letter switch on the delay switch
        map.put("x-dead-letter-exchange", DIRECT_EXCHANGE_DEAD);
        map.put("x-dead-letter-routing-key", DIRECT_DEAD_ROUTING_KEY);
        // Set the delay queue extension duration to 10s
// map.put("x-message-ttl", 10000);
        // Create a delay queue
        channel.queueDeclare(DIRECT_QUEUE_DELAY, true.false.false, map);
        // Bind delay queue on delay switch
        channel.queueBind(DIRECT_QUEUE_DELAY, DIRECT_EXCHANGE_DELAY, DIRECT_DELAY_ROUTING_KEY);

        // Create a dead letter queue and a dead letter switch
        channel.exchangeDeclare(DIRECT_EXCHANGE_DEAD, BuiltinExchangeType.TOPIC, true.false.null);
        Create a dead letter queue
        channel.queueDeclare(DIRECT_QUEUE_DEAD, true.false.false.null);
        // Bind a dead letter queue to a dead letter exchange
        channel.queueBind(DIRECT_QUEUE_DEAD, DIRECT_EXCHANGE_DEAD, DIRECT_DEAD_ROUTING_KEY);

        AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
        builder.expiration("10000");
        AMQP.BasicProperties properties = builder.build();
        channel.basicPublish(DIRECT_EXCHANGE_DELAY, DIRECT_DELAY_ROUTING_KEY, false, properties,  "hello world".getBytes());

    }

    private static void closeConnection(Connection connection, Channel channel) throws IOException, TimeoutException {
        // Close the resource
        channel.close();
        connection.close();
    }

    private static Connection createConnection(a) throws IOException, TimeoutException {
        // Create a connection factory
        ConnectionFactory factory = new ConnectionFactory();
        // Set the link parameters for RabbitMQ
        factory.setHost(IP_ADDRESS);
        factory.setPort(PORT);
        factory.setUsername("echo");
        factory.setPassword("123456");
        // Create a link to RabbitMQ
        returnfactory.newConnection(); }}Copy the code

When we’re done, we can see that it’s the same as what we did before

The difference between the two sets of expiration time is that one sets the expiration time uniformly and the other specifies each expiration time. But it doesn’t affect our implementation of the delay queue, so how do we choose?

How to choose TTL setting mode?

It makes sense to choose a scenario based on the characteristics of both methods. If we use it to do delay queue, and want to apply the characteristics of delay queue to the actual scene, and have high requirements on the real-time, we suggest to choose the first way.

conclusion

The implementation of delayed queue is not difficult, the key is to know its one principle, understand RabbitMQ his TTL and dead letter. After mastering these characteristics of it, we can apply delay queue very well. Delay queues are also very useful to us at work, but RabbiTMQ doesn’t have a native delay queue and just because we implemented it this way doesn’t mean we have to choose it. In fact, there are many ways, such as DelayQueu in Java, Kafka time wheel, etc.