This is the 30th day of my participation in the More Text Challenge. For more details, see more text Challenge
Little drops of water wear through a stone 😄
preface
An exception may occur while the consumer is processing the message, so how do you handle this exception message at this point in time?
RabbitMQ has two methods, channel.basicNack or channel.basicReject, that send messages back to the queue, which allows for retries. But if the second consumption is abnormal, the consumption is abnormal. This is a fatal problem because the number of retries is not specified, resulting in an infinite number of retries.
This article uses spring-Rabbit’s built-in retry feature to address this issue.
coding
Rely on
The starter- AMQP includes spring-Rabbit.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.16</version>
<scope>compile</scope>
</dependency>
Copy the code
configuration
You need to perform simple configuration to enable it
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto None, manual(ACK), and Auto (ACK) default
retry:
enabled: true # Listen for retries
max-attempts: 5 The default maximum number of retries is 3
initial-interval: 2000 The default interval for sending messages is 1s
host: 47.105. *
port: 5672
virtual-host: / * 1
username: *
password: *
mq:
queueBinding:
queue: prod_queue_pay
exchange:
name: exchang_prod_pay
type: topic
key: prod_pay
Copy the code
Create a service queue and switch
@Configuration
public class RabbitConfig {
@Value("${mq.queueBinding.queue}")
private String queueName;
@Value("${mq.queueBinding.exchange.name}")
private String exchangeName;
@Value("${mq.queueBinding.key}")
private String key;
/** * Service queue *@return* /
@Bean
public Queue payQueue(a){
Map<String,Object> params = new HashMap<>();
return QueueBuilder.durable(queueName).withArguments(params).build();
}
@Bean
public TopicExchange payTopicExchange(a){
return new TopicExchange(exchangeName,true.false);
}
// Bind the queue to the switch
@Bean
public Binding BindingPayQueueAndPayTopicExchange(Queue payQueue, TopicExchange payTopicExchange){
returnBindingBuilder.bind(payQueue).to(payTopicExchange).with(key); }}Copy the code
producers
@Component
@Slf4j
public class RabbitSender {
@Value("${mq.queueBinding.exchange.name}")
private String exchangeName;
@Value("${mq.queueBinding.key}")
private String key;
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String msg){
log.info("RabbitSender.send() msg = {}",msg);
// Send the message to the service switchrabbitTemplate.convertAndSend(exchangeName,key,msg); }}Copy the code
consumers
@Component
@Slf4j
public class RabbitReceiver {
int count = 0;
// Test retry
@RabbitListener(queues = "${mq.queueBinding.queue}")
public void infoConsumption(String data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
log.info("Retry times = {}",count++);
int i = 10 /0;
channel.basicAck(tag,false); }}Copy the code
Provide external methods
@Controller
public class TestController {
@Autowired
private RabbitSender rabbitSender;
@GetMapping
public void test(@RequestParam String msg){ rabbitSender.send(msg); }}Copy the code
Then call the interface:http://localhost:8080/?msg= thriving, the message will be sent toprod_queue_pay
This queue. Then try again five times.The interval for each retry is 2 seconds, which is consistent with the configuration.
Note: The retry does not mean that RabbitMQ resends the message to the queue, it is just a retry within the consumer, in other words the retry has nothing to do with MQ. Try {}catch(){} is not allowed to be added to the consumer code above. Once the exception is caught, in automatic ACK mode, the message is processed correctly, the message is confirmed, and no retry is triggered. Try {}catch(){}; It can be written as follows:
@RabbitListener(queues = "${mq.queueBinding.queue}")
public void infoConsumption(String data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
log.info("Retry times = {}",count++);
try {
// Handle main business
int i = 10 /0;
} catch (Exception e) {
// Handle the service failure and perform other operations, such as recording the failure cause
log.info("Record the failure cause ====>");
throw new RuntimeException("Throw by hand");
}
channel.basicAck(tag,false);
}
Copy the code
MessageReCoverer
After 5 retries, the console output an abnormal stack log, and then the queue was ack (because I configured auto ack mode). If you configure manual ACK, the result will look like this:
After five retries, the consumption is in an unconfirmed state. Because you need to manually ack! The next time the service is restarted, the message will be consumed again.
First, let’s look at what the exception log is:
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException:
Retry Policy Exhausted
Copy the code
Because of abnormal above in constructing SimpleRabbitListenerContainerFactoryConfigurer class USES the MessageRecoverer interface, the interface has a recover method, is used to implement retry after complete the processing of the message, The source code is as follows:
public final class SimpleRabbitListenerContainerFactoryConfigurer
extends AbstractRabbitListenerContainerFactoryConfigurer<SimpleRabbitListenerContainerFactory> {
@Override
public void configure(SimpleRabbitListenerContainerFactory factory, ConnectionFactory connectionFactory) {
PropertyMapper map = PropertyMapper.get();
RabbitProperties.SimpleContainer config = getRabbitProperties().getListener().getSimple();
configure(factory, connectionFactory, config); >> 1map.from(config::getConcurrency).whenNonNull().to(factory::setConcurrentConsumers); map.from(config::getMaxConcurrency).whenNonNull().to(factory::setMaxConcurrentConsumers); map.from(config::getBatchSize).whenNonNull().to(factory::setBatchSize); }}Copy the code
Note the configure method marked >> 1
ListenerRetry retryConfig = configuration.getRetry();
if(retryConfig.isEnabled()) { RetryInterceptorBuilder<? ,? > builder = (retryConfig.isStateless()) ? RetryInterceptorBuilder.stateless() : RetryInterceptorBuilder.stateful(); RetryTemplate retryTemplate =new RetryTemplateFactory(this.retryTemplateCustomizers)
.createRetryTemplate(retryConfig, RabbitRetryTemplateCustomizer.Target.LISTENER);
builder.retryOperations(retryTemplate);
MessageRecoverer recoverer = (this.messageRecoverer ! =null)?this.messageRecoverer
: new RejectAndDontRequeueRecoverer(); / / < 1 >
builder.recoverer(recoverer);
factory.setAdviceChain(builder.build());
Copy the code
Look at < 1 > the code, the default is used RejectAndDontRequeueRecoverer class, this class has been seen, pay attention to the image in front of the author. According to the name of the class, we can see that the purpose of the implementation class is to reject and not send the message back to the queue. In other words, if the message is not successful after a retry, the message is considered hopeless and abandoned. Take a look at this implementation class:
public class RejectAndDontRequeueRecoverer implements MessageRecoverer {
protected Log logger = LogFactory.getLog(RejectAndDontRequeueRecoverer.class); // NOSONAR protected
@Override
public void recover(Message message, Throwable cause) {
if (this.logger.isWarnEnabled()) {
this.logger.warn("Retries exhausted for message " + message, cause);
}
throw new ListenerExecutionFailedException("Retry Policy Exhausted".newAmqpRejectAndDontRequeueException(cause), message); }}Copy the code
The source code above gives the source of the exception.
MessageRecoverer
There is only one interfacerecover
Method to call back messages that have been consumed but all retry attempts failed.
Rewrite therecover
Method has four classes,MessageBatchRecoverer
This is beyond the scope of this article. 而RejectAndDontRequeueRecoverer
You’ve already seen the functionality of the. It’s the default. There are two other implementation classes, namelyRepublishMessageRecoverer
andImmediateRequeueMessageRecoverer
, the main meaning of which isRepublish the message and immediately return to the original queueLet’s test the effect of the two implementation classes separately.
RepublishMessageRecoverer
Resends the message to the specified queue. You create a queue, bind it to the switch binding, and then set the MessageRecoverer. Add code to the RabbitConfig class. It looks similar to the dead letter queue.
@Autowired
private RabbitTemplate rabbitTemplate;
private static String errorTopicExchange = "error-topic-exchange";
private static String errorQueue = "error-queue";
private static String errorRoutingKey = "error-routing-key";
// Create an abnormal switch
@Bean
public TopicExchange errorTopicExchange(a){
return new TopicExchange(errorTopicExchange,true.false);
}
// Create an exception queue
@Bean
public Queue errorQueue(a){
return new Queue(errorQueue,true);
}
// Bind the queue to the switch
@Bean
public Binding BindingErrorQueueAndExchange(Queue errorQueue,TopicExchange errorTopicExchange){
return BindingBuilder.bind(errorQueue).to(errorTopicExchange).with(errorRoutingKey);
}
/ / set MessageRecoverer
@Bean
public MessageRecoverer messageRecoverer(a){
// Both AmqpTemplate and RabbitTemplate work
return new RepublishMessageRecoverer(rabbitTemplate,errorTopicExchange,errorRoutingKey);
}
Copy the code
Start the service, call the interface again, and see the result:
As you can see from the console, we used what we configuredRepublishMessageRecoverer
, and the message retries 5 times later directly with the newroutingKey
When you view the monitoring page, you can see that there is no message in the original queue, but there is a message in the configured exception queue.
ImmediateRequeueMessageRecoverer
Use ImmediateRequeueMessageRecoverer, retry failed messages immediately back to the original in the queue. Modify the messageRecoverer method
@Bean
public MessageRecoverer messageRecoverer(a){
return new ImmediateRequeueMessageRecoverer();
}
Copy the code
Start the service, call the interface again, and see the result:
After five retries, the message is returned to the queue, and then consumed again. Five more retries, and the cycle continues until the message is consumed.
conclusion
With the above tests, for messages that are still abnormal after retries, you can useRepublishMessageRecoverer
, to send messages to other queues, which are then processed specifically for the new queue.
Dead-letter queue
In addition to using the above RepublishMessageRecoverer, also can use handled the dead-letter queue to retry failed messages. And that’s the way we always do it.
Create dead-letter switches, dead-letter queues, and bindings of the two
Continue adding configurations in RabbitConfig
private static String dlTopicExchange = "dl-topic-exchange";
private static String dlQueue = "dl-queue";
private static String dlRoutingKey = "dl-routing-key";
// Create a switch
@Bean
public TopicExchange dlTopicExchange(a){
return new TopicExchange(dlTopicExchange,true.false);
}
// Create a queue
@Bean
public Queue dlQueue(a){
return new Queue(dlQueue,true);
}
// Bind the queue to the switch
@Bean
public Binding BindingDlQueueAndExchange(Queue dlQueue, TopicExchange dlTopicExchange){
return BindingBuilder.bind(dlQueue).to(dlTopicExchange).with(dlRoutingKey);
}
Copy the code
The definition of a dead letter switch is exactly the same as that of a normal switch. The way a queue binds a dead letter switch is exactly the same as that of a normal switch. A dead letter switch is just a normal switch with a different name, nothing special.
Modify the configuration
Modify the service queue configuration and annotate the previously provided MessageReCoverer, otherwise the dead-letter switch will not take effect and will be dominated by the MessageReCoverer we configured.
/** * To bind a dead-letter switch, set the following two parameters for the queue: * Service queue *@return* /
@Bean
public Queue payQueue(a){
Map<String,Object> params = new HashMap<>();
// Declare the dead-letter switch bound to the current queue
params.put("x-dead-letter-exchange",dlTopicExchange);
// Declare the dead-letter routing key for the current queue
params.put("x-dead-letter-routing-key",dlRoutingKey);
return QueueBuilder.durable(queueName).withArguments(params).build();
}
/ / set MessageRecoverer
//@Bean
//public MessageRecoverer messageRecoverer() {
// Both AmqpTemplate and RabbitTemplate work
//return new ImmediateRequeueMessageRecoverer();
// }
Copy the code
Before starting the service, delete the previously created queue because the configuration of the queue has been changed. The startup is successful. You can see that the service queue and the dead letter queue, service switch, and dead letter switch are created at the same time.
The name DLX and DLK appear on the service queue, which means that the dead-letter switch and dead-letter routing key have been bound. At this time, the call producer sends the messageMessageCover
The default implementation class isRejectAndDontRequeueRecoverer
, and because the business queue is bound to the dead letter queue, the message is removed from the business queue and sent to the dead letter queue.
reference
RabbitMQ retry mechanism
- If you have any questions or errors in this article, please feel free to comment. If you find this article helpful, please click like and follow.