Dead-letter queue

concept

A dead letter is a message that cannot be consumed. Generally speaking, producer sends the message to the broker or directly to the queue, while a consumer takes the message from the queue and consumes it. However, some messages in the queue cannot be consumed ** due to certain reasons. Such messages become dead letters if there is no subsequent processing, and there is a dead letter queue naturally. Application scenario: To ensure that the message data of the order service is not lost, use the RabbitMQ dead-letter queue mechanism. When the message consumption is abnormal, the RabbitMQ dead-letter queue sends the message. And for example: users in the mall to place a successful order and click to pay after the specified time does not pay automatically invalid

The source of dead letters

  • Message TTL (live time) expired.
  • The queue reaches its maximum length (the queue is full and no more data can be added to MQ);
  • The message is rejected (basic.reject or basic.nack) and requeue=false;

Dead letter of actual combat

Code architecture diagram

Demonstrates a message entering a dead letter queue due to TTL expiration

producers

package com.vleus.rabbitmq.dead_exchange;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.impl.AMQBasicProperties;
import com.vleus.rabbitmq.utils.RabbitMqUtils;

import java.nio.charset.StandardCharsets;

/ * * *@author vleus
 * @date2021 07月24 22:23 * * Dead letter queue producer production message */
public class Producer {

    public static final String NORMAL_EXCHANGE = "normal_exchange";

    public static final String NORMAL_QUEUE = "normal_queue";

    public static void main(String[] args) throws Exception {

        Channel channel = RabbitMqUtils.getChannel();

        // Set TTL (time to Live)
        AMQP.BasicProperties basicProperties =
                new AMQP.BasicProperties()
                        .builder().expiration("10000").build();

        for (int i = 1; i < 11; i++) {
            String message = "info" + i;
            channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",basicProperties,message.getBytes(StandardCharsets.UTF_8)); }}}Copy the code

Consumer C1 (consumes messages in the normal queue)

package com.vleus.rabbitmq.dead_exchange;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.vleus.rabbitmq.utils.RabbitMqUtils;

import java.util.HashMap;
import java.util.Map;

/ * * *@author vleus
 * @date2021 July 24 22:02 * < P > * Dead letter queue actual combat * Normal consumer 1 */
public class Consumer01 {

    // Define a common switch
    public static final String NORMAL_EXCHANGE = "normal_exchange";

    // Dead letter queue switch
    public static final String DEAD_EXCHANGE = "dead_exchange";

    // Common queue name
    public static final String NORMAL_QUEUE = "normal_queue";
    // The name of the dead letter queue
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception {

        // Get the channel
        Channel channel = RabbitMqUtils.getChannel();

        // Declare dead letter and common switch, type direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

        // Declare a normal queue
        Map<String, Object> argumentMap = new HashMap<>();
        // Set the expiration time for normal messages
        argumentMap.put("x-message-ttl".10000);
        // Who is the dead letter switch?
        argumentMap.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        // Set a dead letter routingKey
        argumentMap.put("x-dead-letter-routing-key"."lisi");
        // Set the queue length limit for normal queues
// argumentMap.put("x-max-length", 6);
        channel.queueDeclare(NORMAL_QUEUE, false.false.false, argumentMap);

        // Declare a dead letter queue (messages are forwarded from a normal queue to a dead letter queue)
        channel.queueDeclare(DEAD_QUEUE, false.false.false.null);

        // Common switch and common queue binding
        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");
        // Bind a dead letter switch to a dead letter queue
        channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");

        System.out.println("Normal consumer C1 waiting to receive message");

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("Consumer01 receives the message:" + new String(message.getBody(), "UTF-8"));
        };

        channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, consumerTag -> { }); }}Copy the code

Consumers C2

package com.vleus.rabbitmq.dead_exchange;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.vleus.rabbitmq.utils.RabbitMqUtils;

import java.util.HashMap;
import java.util.Map;

/ * * *@author vleus
 * @date2021 July 24 22:02 * < P > * Dead letter queue actual combat * Normal consumer 1 */
public class Consumer02 {

    // The name of the dead letter queue
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception {

        // Get the channel
        Channel channel = RabbitMqUtils.getChannel();

        System.out.println("Normal consumer C1 waiting to receive message");

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("Consumer02 receives the message:" + new String(message.getBody(), "UTF-8"));
        };

        channel.basicConsume(DEAD_QUEUE, true, deliverCallback, consumerTag -> { }); }}Copy the code

Demo message exceeds normal queue length and enters dead letter queue

producers

package com.vleus.rabbitmq.dead_exchange;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.impl.AMQBasicProperties;
import com.vleus.rabbitmq.utils.RabbitMqUtils;

import java.nio.charset.StandardCharsets;

/ * * *@author vleus
 * @date2021 07月24 22:23 * * Dead letter queue producer production message */
public class Producer {

    public static final String NORMAL_EXCHANGE = "normal_exchange";

    public static final String NORMAL_QUEUE = "normal_queue";

    public static void main(String[] args) throws Exception {

        Channel channel = RabbitMqUtils.getChannel();

        // Set TTL (time to Live)
// AMQP.BasicProperties basicProperties =
// new AMQP.BasicProperties()
// .builder().expiration("10000").build();

        for (int i = 1; i < 11; i++) {
            String message = "info" + i;
            channel.basicPublish(NORMAL_EXCHANGE,"zhangsan".null,message.getBytes(StandardCharsets.UTF_8)); }}}Copy the code
package com.vleus.rabbitmq.dead_exchange;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.vleus.rabbitmq.utils.RabbitMqUtils;

import java.util.HashMap;
import java.util.Map;

/ * * *@author vleus
 * @date2021 July 24 22:02 * < P > * Dead letter queue actual combat * Normal consumer 1 */
public class Consumer01 {

    // Define a common switch
    public static final String NORMAL_EXCHANGE = "normal_exchange";

    // Dead letter queue switch
    public static final String DEAD_EXCHANGE = "dead_exchange";

    // Common queue name
    public static final String NORMAL_QUEUE = "normal_queue";
    // The name of the dead letter queue
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception {

        // Get the channel
        Channel channel = RabbitMqUtils.getChannel();

        // Declare dead letter and common switch, type direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

        // Declare a normal queue
        Map<String, Object> argumentMap = new HashMap<>();
        // Set the expiration time for normal messages
// argumentMap.put("x-message-ttl", 10000);
        // Who is the dead letter switch?
        argumentMap.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        // Set a dead letter routingKey
        argumentMap.put("x-dead-letter-routing-key"."lisi");
        // Set the queue length limit for normal queues
        argumentMap.put("x-max-length".6);
        channel.queueDeclare(NORMAL_QUEUE, false.false.false, argumentMap);

        // Declare a dead letter queue (messages are forwarded from a normal queue to a dead letter queue)
        channel.queueDeclare(DEAD_QUEUE, false.false.false.null);

        // Common switch and common queue binding
        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");
        // Bind a dead letter switch to a dead letter queue
        channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");

        System.out.println("Normal consumer C1 waiting to receive message");

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("Consumer01 receives the message:" + new String(message.getBody(), "UTF-8"));
        };

        channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, consumerTag -> { }); }}Copy the code

Consumers C2

package com.vleus.rabbitmq.dead_exchange;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.vleus.rabbitmq.utils.RabbitMqUtils;

import java.util.HashMap;
import java.util.Map;

/ * * *@author vleus
 * @date2021 July 24 22:02 * < P > * Dead letter queue actual combat * Normal consumer 1 */
public class Consumer02 {

    // The name of the dead letter queue
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception {

        // Get the channel
        Channel channel = RabbitMqUtils.getChannel();

        System.out.println("Normal consumer C1 waiting to receive message");

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("Consumer02 receives the message:" + new String(message.getBody(), "UTF-8"));
        };

        channel.basicConsume(DEAD_QUEUE, true, deliverCallback, consumerTag -> { }); }}Copy the code

The demo message was denied entry to the dead-letter queue

package com.vleus.rabbitmq.dead_exchange;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.impl.AMQBasicProperties;
import com.vleus.rabbitmq.utils.RabbitMqUtils;

import java.nio.charset.StandardCharsets;

/ * * *@author vleus
 * @date2021 07月24 22:23 * * Dead letter queue producer production message */
public class Producer {

    public static final String NORMAL_EXCHANGE = "normal_exchange";

    public static final String NORMAL_QUEUE = "normal_queue";

    public static void main(String[] args) throws Exception {

        Channel channel = RabbitMqUtils.getChannel();

        // Set TTL (time to Live)
// AMQP.BasicProperties basicProperties =
// new AMQP.BasicProperties()
// .builder().expiration("10000").build();

        for (int i = 1; i < 11; i++) {
            String message = "info" + i;
            channel.basicPublish(NORMAL_EXCHANGE,"zhangsan".null,message.getBytes(StandardCharsets.UTF_8)); }}}Copy the code

Consumers C1

package com.vleus.rabbitmq.dead_exchange;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.vleus.rabbitmq.utils.RabbitMqUtils;

import java.util.HashMap;
import java.util.Map;

/ * * *@author vleus
 * @date2021 July 24 22:02 * < P > * Dead letter queue actual combat * Normal consumer 1 */
public class Consumer01 {

    // Define a common switch
    public static final String NORMAL_EXCHANGE = "normal_exchange";

    // Dead letter queue switch
    public static final String DEAD_EXCHANGE = "dead_exchange";

    // Common queue name
    public static final String NORMAL_QUEUE = "normal_queue";
    // The name of the dead letter queue
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception {

        // Get the channel
        Channel channel = RabbitMqUtils.getChannel();

        // Declare dead letter and common switch, type direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

        // Declare a normal queue
        Map<String, Object> argumentMap = new HashMap<>();
        // Set the expiration time for normal messages
// argumentMap.put("x-message-ttl", 10000);
        // Who is the dead letter switch?
        argumentMap.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        // Set a dead letter routingKey
        argumentMap.put("x-dead-letter-routing-key"."lisi");
        // Set the queue length limit for normal queues
// argumentMap.put("x-max-length", 6);
        channel.queueDeclare(NORMAL_QUEUE, false.false.false, argumentMap);

        // Declare a dead letter queue (messages are forwarded from a normal queue to a dead letter queue)
        channel.queueDeclare(DEAD_QUEUE, false.false.false.null);

        // Common switch and common queue binding
        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");
        // Bind a dead letter switch to a dead letter queue
        channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");

        System.out.println("Normal consumer C1 waiting to receive message");

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            String msg = new String(message.getBody(), "UTF-8");
            if (msg.equals("info5")) {
                System.out.println("Consumer01 receives the message:" + msg + ", this message was rejected.");
                // The consumer rejects the message. False indicates that the message is placed in a dead-letter queue
                channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
            }else{
                System.out.println("Consumer01 receives the message:" + msg);
                channel.basicAck(message.getEnvelope().getDeliveryTag(),false); }};// If manual reply is enabled, the message is rejected
        channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, consumerTag -> { }); }}Copy the code

Consumers C2

package com.vleus.rabbitmq.dead_exchange;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.vleus.rabbitmq.utils.RabbitMqUtils;

import java.util.HashMap;
import java.util.Map;

/ * * *@author vleus
 * @date2021 July 24 22:02 * < P > * Dead letter queue actual combat * Normal consumer 1 */
public class Consumer02 {

    // The name of the dead letter queue
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception {

        // Get the channel
        Channel channel = RabbitMqUtils.getChannel();

        System.out.println("Normal consumer C1 waiting to receive message");

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("Consumer02 receives the message:" + new String(message.getBody(), "UTF-8"));
        };

        channel.basicConsume(DEAD_QUEUE, true, deliverCallback, consumerTag -> { }); }}Copy the code