This is the 23rd day of my participation in the More text Challenge. For more details, see more text Challenge
Little drops of water wear through a stone 😄
What is a dead letter queue
A dead-letter Exchange (DLX) can be called a dead-letter Exchange. After a message becomes dead message in a queue, it can be sent back to another exchange, the DLX. A queue bound to a DLX is called a dead letter queue.
There are several common reasons for a message to become a dead letter:
-
1. Message expiration, which is the TTL mentioned in the previous article. The lifetime of the message in the queue exceeded the TTL time set.
-
BasicNack or channel.basicReject is called, and the Requeue parameter is set to false.
-
3. The message length of the queue reaches the maximum.
The DLX is also a normal exchange, no different from a normal exchange, it can be specified on any queue, in effect setting the properties of a particular queue. When there is a dead letter in the queue, RabbitMQ automatically publishes the message to the set DLX and routes it to another queue, the dead letter queue. Messages in this queue can be listened for for appropriate processing.
Configure the dead letter queue
Add DLX to this queue by setting the X-dead-letter-Exchange parameter in the channel.queueDeclare method. The first and second scenarios are described below for configuring and using dead-letter queues.
Message expiration
Add the configuration
mq:
queueBinding:
queue: prod_queue_pay
dlQueue: dl-queue
exchange:
name: exchang_prod_pay
dlTopicExchange: dl-topic-exchange
key: prod_pay
dlRoutingKey: dl-routing-key
Copy the code
Create dead-letter switches, dead-letter queues, and bindings of the two
@Value("${mq.queueBinding.exchange.dlTopicExchange}")
private String dlTopicExchange;
@Value("${mq.queueBinding.dlRoutingKey}")
private String dlRoutingKey;
@Value("${mq.queueBinding.dlQueue}")
private String dlQueue;
// Create a dead-letter switch
@Bean
public TopicExchange dlTopicExchange(a){
return new TopicExchange(dlTopicExchange,true.false);
}
// Create a dead letter queue
@Bean
public Queue dlQueue(a){
return new Queue(dlQueue,true);
}
// Bind the dead letter queue to the dead letter switch
@Bean
public Binding BindingErrorQueueAndExchange(Queue dlQueue, TopicExchange dlTopicExchange){
return BindingBuilder.bind(dlQueue).to(dlTopicExchange).with(dlRoutingKey);
}
Copy the code
As you can see, the code above is nothing special. It just creates a switch, a queue, and binds the two together.
Create service queues, service switches, and bindings between the two
@Value("${mq.queueBinding.queue}")
private String queueName;
@Value("${mq.queueBinding.exchange.name}")
private String exchangeName;
@Value("${mq.queueBinding.key}")
private String key;
private final String dle = "x-dead-letter-exchange";
private final String dlk = "x-dead-letter-routing-key";
private final String ttl = "x-message-ttl";
/** * Service queue *@return* /
@Bean
public Queue payQueue(a){
Map<String,Object> params = new HashMap<>();
// Set the expiration time of the queue
// All messages in the queue have the same expiration time
params.put(ttl,10000);
// Declare the dead-letter switch bound to the current queue
params.put(dle,dlTopicExchange);
// Declare the current queue's dead-letter routing key. If not specified, use the original queue's routing key:
params.put(dlk,dlRoutingKey);
return QueueBuilder.durable(queueName).withArguments(params).build();
}
@Bean
public TopicExchange payTopicExchange(a){
return new TopicExchange(exchangeName,true.false);
}
// Bind the queue to the switch
@Bean
public Binding BindingPayQueueAndPayTopicExchange(Queue payQueue, TopicExchange payTopicExchange){
return BindingBuilder.bind(payQueue).to(payTopicExchange).with(key);
}
Copy the code
The key code above is the creation of a business queue. The expiration time of the queue is specified and the dead-letter queue is configured.
Producer code
/* * Producer */
@Component
@Slf4j
public class RabbitSender {
@Value("${mq.queueBinding.exchange.name}")
private String exchangeName;
@Value("${mq.queueBinding.key}")
private String key;
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String msg){
log.info("RabbitSender.send() msg = {}",msg);
// Send the message to the service switchrabbitTemplate.convertAndSend(exchangeName,key,msg); }}Copy the code
Provide external methods
@Autowired
private RabbitSender rabbitSender;
@GetMapping
public void test(@RequestParam String msg){
rabbitSender.send(msg);
}
Copy the code
Starting the service, you can see that the business queue and the business switch are created together with the dead letter queue and the dead letter switch. And you can see that the business queue has DLX and DLK labels.Then call the interface:http://localhost:8080/?msg= thriving, the message will be sent toprod_queue_pay
This queue.
If no consumer consumes the message within 10s, the message is determined to be expired. Because DLX is set, messages are thrown when they expiredlxExchange
On the switch, according to the configurationdlRoutingKey
To finddlxExchange
Matched queuedlQueue
After the message is stored indlxQueue
In this dead letter queue.
Message rejected
The expiration time of the service queue is removed.
/** * Service queue *@return* /
@Bean
public Queue payQueue(a){
Map<String,Object> params = new HashMap<>();
// Declare the dead-letter switch bound to the current queue
params.put(dle,dlTopicExchange);
// Declare the current queue's dead-letter routing key. If not specified, use the original queue's routing key:
params.put(dlk,dlRoutingKey);
return QueueBuilder.durable(queueName).withArguments(params).build();
}
Copy the code
The configuration file added the configuration
rabbitmq:
listener:
simple:
# Message confirmation mode manual Auto None None
acknowledge-mode: manual
Copy the code
Add a consumer
/** * Consumer */
@Component
@Slf4j
public class RabbitReceiver {
// Test whether a consumer sends a dead-letter queue
@RabbitListener(queues = "${mq.queueBinding.queue}")
public void infoConsumption(String data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
log.info("Received message :{}",data);
boolean ack = false;
Exception exception = null;
try {
if(data.contains("888")) {throw new RuntimeException("Information sensitive"); }}catch (RuntimeException e) {
ack = true;
exception = e;
}
if (ack){
log.error(Error MSG :{}", exception.getMessage(), exception);
// Note that the third argument needs to be false
// If true, it will be put back into the original queue. Otherwise, it will be discarded or put into the dead letter queue.
channel.basicNack(tag, false.false);
} else {
// Manually confirm that the information has been consumed
channel.basicAck(tag, false); }}}Copy the code
In the code above, the channel.basicNack method has an infinite retry problem if the third argument is set to true. In the next article, I’ll show you how to solve the infinite retry problem.
Delete the previous queue and restart it to generate a new service queue. You can see that the label of the service queue is missing the TTL.
Request interface:http://localhost:8080/?msg=888Because there is a sensitive message, the business generates an exception and forwards the message to the dead letter queue.
- If you have any questions or errors in this article, please feel free to comment. If you find this article helpful, please click like and follow.