RabbitMQ dead letter queue and delay queue

RabbitMQ itself has the properties of a dead letter queue and a dead letter switch. In the e-commerce industry, there is usually a requirement that an order is automatically cancelled if it is not paid over time. Delayed queues via RabbitMQ are one way to do this.

1. Dead-letter queues

A Dead Letter, as the name implies, is a message that dies. A dead-letter Exchange is no different from a common switch in that it can accept and send messages to a queue bound to it and routed to it. The difference is that a dead-letter Exchange forwards messages, while the queue bound to the dead-letter Exchange is a dead-letter queue. To put it more commonly, a dead letter switch and a dead letter queue are just ordinary switches and queues, but the information received and forwarded is dead letter, and other operations are no different.

1.1 Conditions of dead letter

A message, called a dead letter, requires the following conditions:

  • The message is rejected by the consumer (basic.reject or back.nack) and requeue=false is set.
  • The message expired because the queue set TTL (Time To Live) Time.
  • The message was discarded because the queue length limit was exceeded.

Rabbitmqctl sets the policy parameter on the rabbitmqctl command line. 2) Hard coding, which means setting it in code.

1.2 Consumer Rejection

1.2.1 Encoding mode

Hard coding is the argument to which the business queue declaration is written in code:

  • x-dead-letter-exchange: Dead letter switch, mandatory
  • x-dead-letter-routing-key: Dead letter Routing key for the switch to forward to the dead letter queue

Producer:

// producer
public class RejectProducer {
	// Define a service switch
    public static final String ORDER_X = "order.exchange";
	/ / the main method
    public static void main(String[] args) throws IOException {
        // Get the connection
        Connection connection = RabbitMQUtil.getConnection();
        // Get the channel
        Channel channel = RabbitMQUtil.getChannel(connection);
        // Send messages with the routing keys d.old.123, d.other.123, and d respectively
        // 1) d.old.123: The business consumer receives the message, and the order dead-letter queue receives the message
        // 2) d.other.123: The business consumer receives the message, and the other dead-letter queue receives the message
        // 3) d: Only business consumers receive messages
        channel.basicPublish(ORDER_X, "d.order.123".null."hello my friend".getBytes(StandardCharsets.UTF_8));
        // Close the resourceRabbitMQUtil.close(channel, connection); }}Copy the code

Consumer:

/** * Dead letter queue condition: Consumer reject * 1) basic.reject(tag, requeue) indicates that the message is rejected. The second parameter indicates whether to rejoin the queue. Note * 2) Basic. nACK (tag, multi, reject) Multi can reject multiple entries at a time. The requeue and reject are the same. Nack indicates unack data, with a separate identifier */
public class RejectConsumer {

    public static final String DEAD_LETTER_X = "dead.letter.exchange";
    public static final String DEAD_LETTER_Q_1 = "dead.letter.queue.order";
    public static final String DEAD_LETTER_Q_2 = "dead.letter.queue.other";
    public static final String ORDER_X = "order.exchange";
    public static final String ORDER_Q = "order.queue";

    public static void main(String[] args) throws IOException {
        // reject and nack
        rejectAndNack();
    }

    public static void rejectAndNack(a) throws IOException {
        // Get the connection
        Connection connection = RabbitMQUtil.getConnection();
        // Get the channel
        Channel channel = RabbitMQUtil.getChannel(connection);
        // Declare dead letter queues and dead letter switches
        declareOrderDLX(channel);
        declareOtherDLX(channel);
        
        // Declare a service switch
        channel.exchangeDeclare(ORDER_X, "topic".false.true.null);
        // Set service parameters
        Map<String, Object> arguments = new HashMap<>();
        // Set up a dead letter switch, a dead letter switch bound to two queues, according to the routing key to distinguish the forwarding of messages
        arguments.put("x-dead-letter-exchange", DEAD_LETTER_X);
        // Set the dead-letter queue routing key:
        // If a routing key is set, the message that is published will be forwarded to the dead letter queue using this routing key.
        // If this parameter is not set, it is forwarded to the dead-letter queue according to the routing key published
// arguments.put("x-dead-letter-routing-key", "d.other");
        // Declare a business queue
        channel.queueDeclare(ORDER_Q, false.false.true, arguments);
        // Bind service queues to service switches
        channel.queueBind(ORDER_Q, ORDER_X, "#");
        // Listen for business queue messages
        channel.basicConsume(ORDER_Q, false.new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("order consumer: " + new String(body, StandardCharsets.UTF_8));
                / / deny
                channel.basicReject(envelope.getDeliveryTag(), false);
                System.out.println("order properties: "+ envelope.toString()); }}); }// Declare the order dead-letter queue
    public static void declareOrderDLX(Channel channel) throws IOException {
        // Declare the switch, unavailable =false, autodelete=true, just for simulation
        channel.exchangeDeclare(DEAD_LETTER_X, "topic".false.true.null);
        // Declare a queue
        channel.queueDeclare(DEAD_LETTER_Q_1, false.false.true.null);
        // Bind the switch and queue to accept only messages for the *.order.# rule
        channel.queueBind(DEAD_LETTER_Q_1, DEAD_LETTER_X, "*.order.#");
        // Listen for messages
        channel.basicConsume(DEAD_LETTER_Q_1, true.new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("dead letter consumer 【order】: " + new String(body, StandardCharsets.UTF_8));
                System.out.println("Dead Letter properties [ORDER] :"+ envelope.toString()); }}); }// Declare the other dead-letter queue
    public static void declareOtherDLX(Channel channel) throws IOException {
        // Declare a switch
        channel.exchangeDeclare(DEAD_LETTER_X, "topic".false.true.null);
        // Declare a queue
        channel.queueDeclare(DEAD_LETTER_Q_2, false.false.true.null);
        // Bind the switch and queue to accept only messages for the *.other.# rule
        channel.queueBind(DEAD_LETTER_Q_2, DEAD_LETTER_X, "*.other.#");
        // Listen for messages
        channel.basicConsume(DEAD_LETTER_Q_2, true.new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("dead letter consumer 【other】: " + new String(body, StandardCharsets.UTF_8));
                System.out.println("Dead Letter Properties [other] :"+ envelope.toString()); }}); }}Copy the code

Conclusion:

Reject or basic.nack forwards messages to a matching dead-letter queue (Requeue =false). Basic. reject has one less parameter mutil than basic.nack, indicating whether to batch back. And the number of Nacks can be seen on the Web side.

2) Use x-letter-dead-exchange to set the dead-letter switch. This is mandatory. X-letter-dead-routing-key Sets the routing key for dead-letter queues. This parameter is equivalent to redefining the routing key for publish. This parameter is optional and needs to be set based on services.

1.1.2 Policy Mode

In policy mode, run the following command on the RabbitMQ server:

Rabbitmqctl set_policy {policy name}". *" '{"dead-letter-exchange":"my-dlx"}' --apply-to queues
Copy the code

Such as:

rabbitmqctl set_policy dlx "dead.*" '{"dead-letter-exchange":"test-dead-letter-exchange"}' --apply-to queues
Copy the code

Set test-dead-letter-exchange policy name to DLX for all queues starting with dead. Create a new exchange named test-dead-letter-exchange in rabbitMQ’s web interface, and create a queue named dead.order.queue and dead.other.queue. Bind test-dead-letter-exchange to order.# and other.#.

Note: Since we want to simulate the dead-letter forwarding to the dead-letter queue, the two new queues have a TTL of 10000ms, that is, 10s.

We see that the message succeeds in dead-.order. queue 10 seconds later, indicating that our configuration is in effect. Here I draw a picture of the process:

1.3 Setting the Expiration Time

Documents: www.rabbitmq.com/ttl.html

We can set expiration times for queues or messages. The expiration time of a queue, similar to the autoDelete parameter, indicates that the queue will be deleted if it is not used for a specified period of time, that the queue has no users, that the queue has not recently been redeclared (redeclared to renew a lease), and that basic.get has not been called at least during the expiration period. This can be used, for example, for rPC-style reply queues, Many queues can be created that may never be exhausted. The message expiration time can be set in the message body when sending the message, or in the queue. In fact, there are two ways: one is set in the queue, and the other is set in the message.

1.3.1 Coding mode

1) Set the message body expiration time

// Set the message properties
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
    .expiration("2000") // Set the message expiration time, 2s, even if the queue has no expiration time
    .build();
/ / message
channel.basicPublish("hello"."order.123", properties, "hello my friend".getBytes(StandardCharsets.UTF_8));
Copy the code

2) Set the message expiration time of the queue

// Set parameters
Map<String, Object> arguments = new HashMap<>();
// Set the queue expiration time to 5s
arguments.put("x-message-ttl".5000);
// Set a dead letter queue
arguments.put("x-dead-letter-exchange", DEAD_LETTER_X);
// Declare a queue
channel.queueDeclare(ORDER_Q, false.false.true, arguments);
Copy the code

We can see that after the queue is created, there is a TTL identifier, x-message-TTL, indicating that the queue has set the message expiration time to 5s.

3) Set the queue expiration time

// Set the queue parameters
Map<String, Object> arguments = new HashMap<>();
// Set the expiration time of messages in the queue, 5s
arguments.put("x-message-ttl".5000);
// Set the queue expiration time. 10s If the queue is not used (no operation is performed), the queue is deleted
arguments.put("x-expires".10000);
// Declare a queue
channel.queueDeclare(ORDER_Q, true.false.false, arguments);
Copy the code

Note: Regardless of whether there is a message in the queue, if there is no action queue, it will be deleted automatically.

1.3.2 Policy Mode

1) Set the message expiration time of the queue

rabbitmqctl set_policy --vhost /adu TTL ". *" '{"message-ttl":60000}' --apply-to queues
Copy the code

Indicates to add a TTL policy under/ADU virtual host and set the expiration time of message-TTL messages for all queues to 60s.

2) Set the queue expiration time

rabbitmqctl set_policy --vhost /adu expiry ". *" '{"expires":1800000}' --apply-to queues
Copy the code

Specifies that a policy named expiry is added under the/ADU virtual host and all queues are set to expire at 180s.

1.4 Exceeding the Queue Length

By default, queues have no length limit (but there are always disk and memory limits). We can display the length of the queue, either as a limit on the number of messages, or as a memory size for the total message content of the queue, or both. The maximum length of a queue can be set using a policy or code, or set on the Web interface when a queue is created. If both policy mode and encoding mode are set, smaller values take effect.

If a queue has a queue length limit, the default overflow rule is to discard the oldest messages (queue headers) when the maximum length of messages in the queue is reached. We can change this rule to use the overflow argument. The overflow value can be x-reject-publish or x-reject-publish- DLS, both of which reject new messages. The difference is that reject-publish- DLX also rejects message 1.

There is a problem here: reject-publish-dlx and reject-publish. Reject -publish- DLX does not receive messages, reject-publish receives messages. But when I did the experiment it was the opposite of what I understood, right? Familiar iron son people reply to say once ha ah.

1.4.1 Encoding mode

Use the x-max-length and x-max-length-bytes parameters.

// Set parameters
Map<String, Object> arguments = new HashMap<>();
// Set the maximum queue length to 5 messages
arguments.put("x-max-length".5);
// Queue overflow policies: drop-head (default), reject-publish, reject-publish- DLX
// arguments.put("x-overflow", "reject-publish-dlx");
arguments.put("x-overflow"."reject-publish");
// Set a dead letter queue
arguments.put("x-dead-letter-exchange", DEAD_LETTER_X);
// Declare a queue
channel.queueDeclare(ORDER_Q, false.false.true, arguments);
Copy the code

1.4.2 Policy Mode

rabbitmqctl set_policy --vhost /adu limit "^five_msg" '{"dead-letter-exchange":"test-dead-letter-exchange","max-length":5,"overflow":"reject-publish-dlx"}' --apply-to queues
Copy the code

Add a policy named limit under/ADU virtual host, set the maximum number of queue messages starting with five_msg to 5, set the message overflow policy to reject, and set the dead letter switch.

The same is true for setting max-length-bytes.

2. Delay queue

Delay queue, as its name implies, is the queue storing delayed messages, that is to say, consumers will receive messages after a certain delay. A typical application scenario is that the order is automatically cancelled if it is not paid due to time out as described above.

2.1 Implement with dead-letter queue

In fact, after introducing the dead letter queue, you can see how to use the dead letter queue to implement the delay queue. The TTL attribute of the message is used to forward the expired message to the dead letter queue. The service listens for the message in the dead letter queue. This situation is suitable for setting the message expiration time for the queue, that is, all messages in the queue have the same expiration time, and the expiration time is forwarded to the dead letter queue in sequence without any problem.

If the expiration time of the message is set on the message body when the message is sent, problems may occur. For example, msG1 and MSG2 messages are sent in sequence. The expiration time of MSG1 is 5s, and that of MSG2 is 2s. The result is that MSG2 is consumed first in the dead-letter queue, but the result is that both messages are forwarded to the dead-letter queue and consumed in 5s. In fact, it is easier to understand because the queue nature is first in, first out. Even if MSG1 reaches the expiration time first, msG1 blocks before it. Only msG1 is consumed can MSG2 reach the queue head to be consumed. Let’s draw a picture:

2.2 Use the RabbitMQ plug-in

Rabbitmq provides a plugin called rabbitmq_delayed_message_exchange that allows us to implement delayed queuing as well as solve the problem of message blocking when implementing delayed queuing via dead-letter queues. This plugin is supported from 3.6.12 for RabbitMQ. Make sure your current version of RabbitMQ supports this plugin.

2.2.1 Downloading plug-ins

Download: github.com/rabbitmq/ra…

Once you have downloaded the rabbitmq_delayed_message_exchange-3.9.0.ez package, place it in the plugins of the RabbitMQ installation directory:

2.2.2 Enabling plug-ins

Run the console command to restart the RabbitMQ service:

# 1. List all plug-ins
rabbitmq-plugins list
# 2. Enable rabbitmq_delayed_message_exchange
rabbitmq-plubins enable rabbitmq_delayed_message_exchange
# 3. Restart the service
systemctl restart rabbitmq-server.service
Copy the code

After that, exchanges on the Web interface can create an exchange of type X-delayed – Message, or declare that type of exchange in the code. If you want to use the delay function, you need to add a header: X-delay = XXX to send a message, indicating a delay of XXX milliseconds.

/ / declare delay switch, type = x - of - the message, x - of - type = direct | fanout | topic
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type"."topic"); // This is the same as the previous exchange type
channel.exchangeDeclare(EXCHANGE_NAME, "x-delayed-message".false.true, args);

// Send the message, x-delay, the value is the expiration time
Map<String, Object> headers = new HashMap<>();
headers.put("x-delay".5000); // 5s
AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
    .headers(headers)
    .build();
// Send a message
channel.basicPublish(EXCHANGE_NAME, "other.save", props, "i am 5s message".getBytes(StandardCharsets.UTF_8));
Copy the code

3, summarize

We have mentioned the implementation of delay queue using RabbitMQ: 1) using a dead letter queue, listening to the dead letter queue; 2) With plug-in implementation. The advantages and disadvantages are as follows:

  • Dead letter queue implementation method, need to set the message expiration time on the queue, inflexible; Need one more dead letter queue, take up space; Rabbitmq can come with a dead letter queue, easy to implement.
  • Plug-in implementation, need to download and install plug-in, to consider version compatibility; The code logic is simple and easy to use.

Back to our initial requirement: the order is automatically cancelled when payment times out. Rabbitmq is one way of doing this, but it can also be done with Java DelayQueue, Quartz timed tasks, Redis zset, time wheel, etc. Specific program or should combine the advantages and disadvantages of the project and specific way to choose. For example, if RabbitMQ is used in a project, RabbitMQ is the best way to implement delay queuing. The choice between plugin and dead letter queuing depends on the flexibility of the project.

Reference:

  1. www.rabbitmq.com/dlx.html
  2. www.cnblogs.com/williamwsj/…
  3. www.jianshu.com/p/256d2eaf1…
  4. www.rabbitmq.com/community-p…
  5. Blog.csdn.net/zhenghongcs…

More articles visit:A programmer who asked not to be named

Finally: because of the small talent, if there is a problem, please don’t hesitate to point out, thank you ~