Dead-letter queue
concept
A dead letter is a message that cannot be consumed. Generally speaking, producer sends the message to the broker or directly to the queue, while a consumer takes the message from the queue and consumes it. However, some messages in the queue cannot be consumed ** due to certain reasons. Such messages become dead letters if there is no subsequent processing, and there is a dead letter queue naturally. Application scenario: To ensure that the message data of the order service is not lost, use the RabbitMQ dead-letter queue mechanism. When the message consumption is abnormal, the RabbitMQ dead-letter queue sends the message. And for example: users in the mall to place a successful order and click to pay after the specified time does not pay automatically invalid
The source of dead letters
- Message TTL (live time) expired.
- The queue reaches its maximum length (the queue is full and no more data can be added to MQ);
- The message is rejected (basic.reject or basic.nack) and requeue=false;
Dead letter of actual combat
Code architecture diagram
Demonstrates a message entering a dead letter queue due to TTL expiration
producers
package com.vleus.rabbitmq.dead_exchange;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.impl.AMQBasicProperties;
import com.vleus.rabbitmq.utils.RabbitMqUtils;
import java.nio.charset.StandardCharsets;
/ * * *@author vleus
* @date2021 07月24 22:23 * * Dead letter queue producer production message */
public class Producer {
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static final String NORMAL_QUEUE = "normal_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// Set TTL (time to Live)
AMQP.BasicProperties basicProperties =
new AMQP.BasicProperties()
.builder().expiration("10000").build();
for (int i = 1; i < 11; i++) {
String message = "info" + i;
channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",basicProperties,message.getBytes(StandardCharsets.UTF_8)); }}}Copy the code
Consumer C1 (consumes messages in the normal queue)
package com.vleus.rabbitmq.dead_exchange;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.vleus.rabbitmq.utils.RabbitMqUtils;
import java.util.HashMap;
import java.util.Map;
/ * * *@author vleus
* @date2021 July 24 22:02 * < P > * Dead letter queue actual combat * Normal consumer 1 */
public class Consumer01 {
// Define a common switch
public static final String NORMAL_EXCHANGE = "normal_exchange";
// Dead letter queue switch
public static final String DEAD_EXCHANGE = "dead_exchange";
// Common queue name
public static final String NORMAL_QUEUE = "normal_queue";
// The name of the dead letter queue
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception {
// Get the channel
Channel channel = RabbitMqUtils.getChannel();
// Declare dead letter and common switch, type direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
// Declare a normal queue
Map<String, Object> argumentMap = new HashMap<>();
// Set the expiration time for normal messages
argumentMap.put("x-message-ttl".10000);
// Who is the dead letter switch?
argumentMap.put("x-dead-letter-exchange", DEAD_EXCHANGE);
// Set a dead letter routingKey
argumentMap.put("x-dead-letter-routing-key"."lisi");
// Set the queue length limit for normal queues
// argumentMap.put("x-max-length", 6);
channel.queueDeclare(NORMAL_QUEUE, false.false.false, argumentMap);
// Declare a dead letter queue (messages are forwarded from a normal queue to a dead letter queue)
channel.queueDeclare(DEAD_QUEUE, false.false.false.null);
// Common switch and common queue binding
channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");
// Bind a dead letter switch to a dead letter queue
channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");
System.out.println("Normal consumer C1 waiting to receive message");
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("Consumer01 receives the message:" + new String(message.getBody(), "UTF-8"));
};
channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, consumerTag -> { }); }}Copy the code
Consumers C2
package com.vleus.rabbitmq.dead_exchange;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.vleus.rabbitmq.utils.RabbitMqUtils;
import java.util.HashMap;
import java.util.Map;
/ * * *@author vleus
* @date2021 July 24 22:02 * < P > * Dead letter queue actual combat * Normal consumer 1 */
public class Consumer02 {
// The name of the dead letter queue
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception {
// Get the channel
Channel channel = RabbitMqUtils.getChannel();
System.out.println("Normal consumer C1 waiting to receive message");
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("Consumer02 receives the message:" + new String(message.getBody(), "UTF-8"));
};
channel.basicConsume(DEAD_QUEUE, true, deliverCallback, consumerTag -> { }); }}Copy the code
Demo message exceeds normal queue length and enters dead letter queue
producers
package com.vleus.rabbitmq.dead_exchange;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.impl.AMQBasicProperties;
import com.vleus.rabbitmq.utils.RabbitMqUtils;
import java.nio.charset.StandardCharsets;
/ * * *@author vleus
* @date2021 07月24 22:23 * * Dead letter queue producer production message */
public class Producer {
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static final String NORMAL_QUEUE = "normal_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// Set TTL (time to Live)
// AMQP.BasicProperties basicProperties =
// new AMQP.BasicProperties()
// .builder().expiration("10000").build();
for (int i = 1; i < 11; i++) {
String message = "info" + i;
channel.basicPublish(NORMAL_EXCHANGE,"zhangsan".null,message.getBytes(StandardCharsets.UTF_8)); }}}Copy the code
package com.vleus.rabbitmq.dead_exchange;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.vleus.rabbitmq.utils.RabbitMqUtils;
import java.util.HashMap;
import java.util.Map;
/ * * *@author vleus
* @date2021 July 24 22:02 * < P > * Dead letter queue actual combat * Normal consumer 1 */
public class Consumer01 {
// Define a common switch
public static final String NORMAL_EXCHANGE = "normal_exchange";
// Dead letter queue switch
public static final String DEAD_EXCHANGE = "dead_exchange";
// Common queue name
public static final String NORMAL_QUEUE = "normal_queue";
// The name of the dead letter queue
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception {
// Get the channel
Channel channel = RabbitMqUtils.getChannel();
// Declare dead letter and common switch, type direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
// Declare a normal queue
Map<String, Object> argumentMap = new HashMap<>();
// Set the expiration time for normal messages
// argumentMap.put("x-message-ttl", 10000);
// Who is the dead letter switch?
argumentMap.put("x-dead-letter-exchange", DEAD_EXCHANGE);
// Set a dead letter routingKey
argumentMap.put("x-dead-letter-routing-key"."lisi");
// Set the queue length limit for normal queues
argumentMap.put("x-max-length".6);
channel.queueDeclare(NORMAL_QUEUE, false.false.false, argumentMap);
// Declare a dead letter queue (messages are forwarded from a normal queue to a dead letter queue)
channel.queueDeclare(DEAD_QUEUE, false.false.false.null);
// Common switch and common queue binding
channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");
// Bind a dead letter switch to a dead letter queue
channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");
System.out.println("Normal consumer C1 waiting to receive message");
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("Consumer01 receives the message:" + new String(message.getBody(), "UTF-8"));
};
channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, consumerTag -> { }); }}Copy the code
Consumers C2
package com.vleus.rabbitmq.dead_exchange;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.vleus.rabbitmq.utils.RabbitMqUtils;
import java.util.HashMap;
import java.util.Map;
/ * * *@author vleus
* @date2021 July 24 22:02 * < P > * Dead letter queue actual combat * Normal consumer 1 */
public class Consumer02 {
// The name of the dead letter queue
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception {
// Get the channel
Channel channel = RabbitMqUtils.getChannel();
System.out.println("Normal consumer C1 waiting to receive message");
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("Consumer02 receives the message:" + new String(message.getBody(), "UTF-8"));
};
channel.basicConsume(DEAD_QUEUE, true, deliverCallback, consumerTag -> { }); }}Copy the code
The demo message was denied entry to the dead-letter queue
package com.vleus.rabbitmq.dead_exchange;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.impl.AMQBasicProperties;
import com.vleus.rabbitmq.utils.RabbitMqUtils;
import java.nio.charset.StandardCharsets;
/ * * *@author vleus
* @date2021 07月24 22:23 * * Dead letter queue producer production message */
public class Producer {
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static final String NORMAL_QUEUE = "normal_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// Set TTL (time to Live)
// AMQP.BasicProperties basicProperties =
// new AMQP.BasicProperties()
// .builder().expiration("10000").build();
for (int i = 1; i < 11; i++) {
String message = "info" + i;
channel.basicPublish(NORMAL_EXCHANGE,"zhangsan".null,message.getBytes(StandardCharsets.UTF_8)); }}}Copy the code
Consumers C1
package com.vleus.rabbitmq.dead_exchange;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.vleus.rabbitmq.utils.RabbitMqUtils;
import java.util.HashMap;
import java.util.Map;
/ * * *@author vleus
* @date2021 July 24 22:02 * < P > * Dead letter queue actual combat * Normal consumer 1 */
public class Consumer01 {
// Define a common switch
public static final String NORMAL_EXCHANGE = "normal_exchange";
// Dead letter queue switch
public static final String DEAD_EXCHANGE = "dead_exchange";
// Common queue name
public static final String NORMAL_QUEUE = "normal_queue";
// The name of the dead letter queue
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception {
// Get the channel
Channel channel = RabbitMqUtils.getChannel();
// Declare dead letter and common switch, type direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
// Declare a normal queue
Map<String, Object> argumentMap = new HashMap<>();
// Set the expiration time for normal messages
// argumentMap.put("x-message-ttl", 10000);
// Who is the dead letter switch?
argumentMap.put("x-dead-letter-exchange", DEAD_EXCHANGE);
// Set a dead letter routingKey
argumentMap.put("x-dead-letter-routing-key"."lisi");
// Set the queue length limit for normal queues
// argumentMap.put("x-max-length", 6);
channel.queueDeclare(NORMAL_QUEUE, false.false.false, argumentMap);
// Declare a dead letter queue (messages are forwarded from a normal queue to a dead letter queue)
channel.queueDeclare(DEAD_QUEUE, false.false.false.null);
// Common switch and common queue binding
channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");
// Bind a dead letter switch to a dead letter queue
channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");
System.out.println("Normal consumer C1 waiting to receive message");
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody(), "UTF-8");
if (msg.equals("info5")) {
System.out.println("Consumer01 receives the message:" + msg + ", this message was rejected.");
// The consumer rejects the message. False indicates that the message is placed in a dead-letter queue
channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
}else{
System.out.println("Consumer01 receives the message:" + msg);
channel.basicAck(message.getEnvelope().getDeliveryTag(),false); }};// If manual reply is enabled, the message is rejected
channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, consumerTag -> { }); }}Copy the code
Consumers C2
package com.vleus.rabbitmq.dead_exchange;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.vleus.rabbitmq.utils.RabbitMqUtils;
import java.util.HashMap;
import java.util.Map;
/ * * *@author vleus
* @date2021 July 24 22:02 * < P > * Dead letter queue actual combat * Normal consumer 1 */
public class Consumer02 {
// The name of the dead letter queue
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception {
// Get the channel
Channel channel = RabbitMqUtils.getChannel();
System.out.println("Normal consumer C1 waiting to receive message");
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("Consumer02 receives the message:" + new String(message.getBody(), "UTF-8"));
};
channel.basicConsume(DEAD_QUEUE, true, deliverCallback, consumerTag -> { }); }}Copy the code