This is the 23rd day of my participation in the More text Challenge. For more details, see more text Challenge

Little drops of water wear through a stone 😄

What is a dead letter queue

A dead-letter Exchange (DLX) can be called a dead-letter Exchange. After a message becomes dead message in a queue, it can be sent back to another exchange, the DLX. A queue bound to a DLX is called a dead letter queue.

There are several common reasons for a message to become a dead letter:

  • 1. Message expiration, which is the TTL mentioned in the previous article. The lifetime of the message in the queue exceeded the TTL time set.

  • BasicNack or channel.basicReject is called, and the Requeue parameter is set to false.

  • 3. The message length of the queue reaches the maximum.

The DLX is also a normal exchange, no different from a normal exchange, it can be specified on any queue, in effect setting the properties of a particular queue. When there is a dead letter in the queue, RabbitMQ automatically publishes the message to the set DLX and routes it to another queue, the dead letter queue. Messages in this queue can be listened for for appropriate processing.

Configure the dead letter queue

Add DLX to this queue by setting the X-dead-letter-Exchange parameter in the channel.queueDeclare method. The first and second scenarios are described below for configuring and using dead-letter queues.

Message expiration

Add the configuration

mq:
  queueBinding:
    queue: prod_queue_pay
    dlQueue: dl-queue
    exchange:
      name: exchang_prod_pay
      dlTopicExchange: dl-topic-exchange
    key: prod_pay
    dlRoutingKey: dl-routing-key
Copy the code

Create dead-letter switches, dead-letter queues, and bindings of the two


    @Value("${mq.queueBinding.exchange.dlTopicExchange}")
    private String dlTopicExchange;
    
    @Value("${mq.queueBinding.dlRoutingKey}")
    private String dlRoutingKey;
    
    @Value("${mq.queueBinding.dlQueue}")
    private String dlQueue;
    
    // Create a dead-letter switch
    @Bean
    public TopicExchange dlTopicExchange(a){
        return new TopicExchange(dlTopicExchange,true.false);
    }
    
    // Create a dead letter queue
    @Bean
    public Queue dlQueue(a){
        return new Queue(dlQueue,true);
    }
    
    // Bind the dead letter queue to the dead letter switch
    @Bean
    public Binding BindingErrorQueueAndExchange(Queue dlQueue, TopicExchange dlTopicExchange){
        return BindingBuilder.bind(dlQueue).to(dlTopicExchange).with(dlRoutingKey);
    }
Copy the code

As you can see, the code above is nothing special. It just creates a switch, a queue, and binds the two together.

Create service queues, service switches, and bindings between the two

    @Value("${mq.queueBinding.queue}")
    private String queueName;
    
    @Value("${mq.queueBinding.exchange.name}")
    private String exchangeName;
    
    @Value("${mq.queueBinding.key}")
    private String key;
    
    private final String dle = "x-dead-letter-exchange";
    private final String dlk = "x-dead-letter-routing-key";
    private final String ttl = "x-message-ttl";
    /** * Service queue *@return* /
    @Bean
    public Queue payQueue(a){
        Map<String,Object> params = new HashMap<>();
         // Set the expiration time of the queue
         // All messages in the queue have the same expiration time
        params.put(ttl,10000);
        // Declare the dead-letter switch bound to the current queue
        params.put(dle,dlTopicExchange);
        // Declare the current queue's dead-letter routing key. If not specified, use the original queue's routing key:
        params.put(dlk,dlRoutingKey);
        
        return QueueBuilder.durable(queueName).withArguments(params).build();
    }
    
    @Bean
    public TopicExchange payTopicExchange(a){
        return new TopicExchange(exchangeName,true.false);
    }
    
    // Bind the queue to the switch
    @Bean
    public Binding BindingPayQueueAndPayTopicExchange(Queue payQueue, TopicExchange payTopicExchange){
        return BindingBuilder.bind(payQueue).to(payTopicExchange).with(key);
    }
Copy the code

The key code above is the creation of a business queue. The expiration time of the queue is specified and the dead-letter queue is configured.

Producer code

/* * Producer */
@Component
@Slf4j
public class RabbitSender {

    @Value("${mq.queueBinding.exchange.name}")
    private String exchangeName;

    @Value("${mq.queueBinding.key}")
    private String key;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(String msg){
        log.info("RabbitSender.send() msg = {}",msg);
        // Send the message to the service switchrabbitTemplate.convertAndSend(exchangeName,key,msg); }}Copy the code

Provide external methods

    @Autowired
    private RabbitSender rabbitSender;

    @GetMapping
    public void test(@RequestParam String msg){
        rabbitSender.send(msg);
    }
Copy the code

Starting the service, you can see that the business queue and the business switch are created together with the dead letter queue and the dead letter switch. And you can see that the business queue has DLX and DLK labels.Then call the interface:http://localhost:8080/?msg= thriving, the message will be sent toprod_queue_payThis queue.

If no consumer consumes the message within 10s, the message is determined to be expired. Because DLX is set, messages are thrown when they expiredlxExchangeOn the switch, according to the configurationdlRoutingKeyTo finddlxExchangeMatched queuedlQueueAfter the message is stored indlxQueueIn this dead letter queue.

Message rejected

The expiration time of the service queue is removed.

/** * Service queue *@return* /
    @Bean
    public Queue payQueue(a){
        Map<String,Object> params = new HashMap<>();
        // Declare the dead-letter switch bound to the current queue
        params.put(dle,dlTopicExchange);
        // Declare the current queue's dead-letter routing key. If not specified, use the original queue's routing key:
        params.put(dlk,dlRoutingKey);
        return QueueBuilder.durable(queueName).withArguments(params).build();
    }
Copy the code

The configuration file added the configuration

rabbitmq:
    listener:
      simple:
        # Message confirmation mode manual Auto None None
        acknowledge-mode: manual
Copy the code

Add a consumer

/** * Consumer */
@Component
@Slf4j
public class RabbitReceiver {

    // Test whether a consumer sends a dead-letter queue
    @RabbitListener(queues = "${mq.queueBinding.queue}")
    public void infoConsumption(String data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
        log.info("Received message :{}",data);
        boolean ack = false;
        Exception exception = null;
        try {
            if(data.contains("888")) {throw new RuntimeException("Information sensitive"); }}catch (RuntimeException e) {
            ack = true;
            exception = e;
        }
        if (ack){
            log.error(Error MSG :{}", exception.getMessage(), exception);
         // Note that the third argument needs to be false
        // If true, it will be put back into the original queue. Otherwise, it will be discarded or put into the dead letter queue.
            channel.basicNack(tag, false.false);
        } else {
        // Manually confirm that the information has been consumed
            channel.basicAck(tag, false); }}}Copy the code

In the code above, the channel.basicNack method has an infinite retry problem if the third argument is set to true. In the next article, I’ll show you how to solve the infinite retry problem.

Delete the previous queue and restart it to generate a new service queue. You can see that the label of the service queue is missing the TTL.

Request interface:http://localhost:8080/?msg=888Because there is a sensitive message, the business generates an exception and forwards the message to the dead letter queue.

  • If you have any questions or errors in this article, please feel free to comment. If you find this article helpful, please click like and follow.