This article is included in personal blog: www.chengxy-nds.top, technical resources sharing, progress together
Recently, the department called on us to organize more technology sharing meetings to activate the company’s technology atmosphere. However, as I have long seen through everything, I know that this is just to brush KPI. That said, it’s a good thing. Instead of having boring meetings, it’s good for personal growth to have technical conversations.
So I took the initiative to participate in the sharing, cough cough ~, really not for that KPI, just want to learn together with everyone!
This time I will share how SpringBoot + RabbitMQ implemented the message confirmation mechanism, and a bit of experience in the actual development of the pit. In fact, the overall content is relatively simple, sometimes things are so magical, the simpler things are more likely to go wrong.
As you can see, with RabbitMQ, our service links are significantly longer and, despite decoupling between systems, there are more scenarios where messages can be lost. Such as:
-
Message producer – > RabbitMQ server (message sending failed)
-
The rabbitMQ server fails, causing message loss
-
Message consumer – > RabbitMQ service (message consumption failed)
So if you can not use middleware, try not to use it. If you use it for the sake of using it, it will only increase your annoyance. After the message confirmation mechanism is opened, although the accurate delivery of messages is ensured to a large extent, due to frequent confirmation interaction,rabbitmq
Overall efficiency drops, throughput drops significantly, and it is really not recommended to use message confirmation for messages that are not very important.
Let’s first implement springboot + RabbitMQ message confirmation mechanism, and then analyze the problems encountered.
1. Prepare the environment
1. Import rabbitMQ dependency packages
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Copy the code
2. Modify the application.properties configuration
The confirmation of messages must be enabled on the sender and the consumer.
spring.rabbitmq.host=127.0. 01.
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# Senders enable confirmspring.rabbitmq.publisher-confirms=true # senders openreturnConfirm the mechanismspring.rabbitmq.publisher-returns=true # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #Set the consumer to manual ACKspring.rabbitmq.listener.simple.acknowledge-mode=manual Whether to support retryspring.rabbitmq.listener.simple.retry.enabled=true Copy the code
3. Define exchanges and queues
Define switch confirmTestExchange and queue confirm_test_Queue, and bind the queue to the switch.
@Configuration
public class QueueConfig {
@Bean(name = "confirmTestQueue")
public Queue confirmTestQueue() {
return new Queue("confirm_test_queue".true.false.false); } @Bean(name = "confirmTestExchange") public FanoutExchange confirmTestExchange() { return new FanoutExchange("confirmTestExchange"); } @Bean public Binding confirmTestFanoutExchangeAndQueue( @Qualifier("confirmTestExchange") FanoutExchange confirmTestExchange, @Qualifier("confirmTestQueue") Queue confirmTestQueue) { return BindingBuilder.bind(confirmTestQueue).to(confirmTestExchange); } } Copy the code
Rabbitmq message confirmation has two parts: sending message confirmation and receiving message confirmation.
2. Message sending confirmation
Sending message confirmation: Used to confirm whether a message is successfully delivered when producer Producer Producer sends a message to the Broker, and the exchange on the broker sends a message to the queue.
Messages from producer to RabbitMQ broker have a confirmCallback mode.
Message delivery failure from Exchange to Queue has a returnCallback return pattern.
We can use these two callbacks to ensure 100% delivery of elimination.
1. ConfirmCallback Confirms the mode
The confirmCallback callback is triggered whenever a message is received by the RabbitMQ broker.
@Slf4j
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) { if(! ack) { log.error("Message sending exception!"); } else { log.info("Sender's father has received the confirmation, correlationData={}, ACK ={}, cause={}", correlationData.getId(), ack, cause); } } } Copy the code
Interface ConfirmCallback, rewrite confirm() method, method has three parameters correlationData, ACK, cause.
correlationData
: There is only one internal objectid
Property to indicate the uniqueness of the current message.ack
: Message delivered tobroker
The status of thetrue
Success.cause
: indicates the cause of delivery failure.
However, a message received by the broker only indicates that it has reached the MQ server and does not guarantee that the message will be delivered to the target queue. So the next step is to use the returnCallback.
2, ReturnCallback callback mode
If the message fails to be delivered to the target queue, the returnCallback will be triggered. If the message fails to be delivered to the queue, the detailed delivery data of the current message will be recorded for subsequent resending or compensation operations.
@Slf4j
@Component
public class ReturnCallbackService implements RabbitTemplate.ReturnCallback {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode, replyText, exchange, routingKey); } } Copy the code
Implement interface ReturnCallback, rewrite the returnedMessage() method, The method takes five parameters: Message (message body), replyCode (response code), replyText (response content), Exchange, and routingKey.
To send a message, set Confirm and Return callbacks to rabbitTemplate, persist the message with setDeliveryMode(), and create a CorrelationData object for the test. Example Add one whose ID is 10000000000.
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private ConfirmCallbackService confirmCallbackService;
@Autowired private ReturnCallbackService returnCallbackService; public void sendMessage(String exchange, String routingKey, Object msg) { / * ** Ensure that a message can be sent back to the queue if it fails* Note: YML requires a publisher-returns: true configuration* / rabbitTemplate.setMandatory(true); / * ** After the consumer confirms receipt of the message, manual ACK callback processing* / rabbitTemplate.setConfirmCallback(confirmCallbackService); / * ** Failed message delivery to the queue callback processing* / rabbitTemplate.setReturnCallback(returnCallbackService); / * ** Send messages* / rabbitTemplate.convertAndSend(exchange, routingKey, msg, message -> { message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); return message; }, new CorrelationData(UUID.randomUUID().toString())); } Copy the code
3. Confirmation of message reception
Message receive acknowledgement is a little easier than message send acknowledgement because there is only one message return receipt (ACK) process. Annotation methods using @Rabbithandler annotations add channel and message parameters.
@Slf4j
@Component
@RabbitListener(queues = "confirm_test_queue")
public class ReceiverMessage1 {
@RabbitHandler public void processHandler(String msg, Channel channel, Message message) throws IOException { try { log.info("Little rich received message: {}", msg); //TODO specific business channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { if (message.getMessageProperties().getRedelivered()) { log.error("Message has failed to be processed twice, refused to receive again..."); channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // Reject the message } else { log.error("Message about to return to queue for processing again..."); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false.true); } } } } Copy the code
There are three ways to consume a message, and let’s examine what each means.
1, basicAck
BasicAck: Confirmation of success. After this method is used, the message will be deleted by rabbitMQ broker.
void basicAck(long deliveryTag, boolean multiple)
Copy the code
DeliveryTag: Indicates the delivery number of the message. After each consumption or re-delivery of the message, the deliveryTag will increase. In manual message confirmation mode, we can ack, nack, reject and other operations on the message specified deliveryTag.
Multiple: Whether to batch confirm. If true, all messages smaller than the current message deliveryTag will be ack at one time.
For example: Suppose I send three messages with deliveryTag 5, 6 and 7 respectively, but none of them are confirmed. When I send the fourth message with deliveryTag 8 and multiple set to True, all the messages of 5, 6, 7 and 8 will be confirmed.
2, basicNack
BasicNack: indicates failure acknowledgement. This method is used when consuming message service exceptions. Messages can be re-queued.
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
Copy the code
DeliveryTag: indicates the message delivery number.
Multiple: Specifies whether to confirm in batches.
Requeue: Messages with a value of true are re-queued.
3, basicReject
BasicReject: Reject a message. BasicNack differs from basicNack in that it cannot be batch processed.
void basicReject(long deliveryTag, boolean requeue)
Copy the code
DeliveryTag: indicates the message delivery number.
Requeue: Messages with a value of true are re-queued.
Four, test,
Send a message to test whether the message confirmation mechanism takes effect. According to the execution result, the sender successfully calls back the message after sending it, and the consumer successfully consumes the message.Use the packet capture toolWireshark
Look at therabbitmq
Amqp protocol interaction changes, tooack
In the process.
Five, step pit log
1. No message confirmation
It’s a very low-tech pit, but a very error-prone place.
Turn on message confirmation and don’t forget to consume messageschannel.basicAck
Otherwise, the message will always exist, leading to repeated consumption.
2. Unlimited delivery of messages
When I first came into contact with the message confirmation mechanism, the code on the consumer side was written like the following. The idea was very simple: after processing the business logic, the message was acknowledged, and when int A = 1/0 was abnormal, the message was put back into the queue.
@RabbitHandler
public void processHandler(String msg, Channel channel, Message message) throws IOException {
try {
log.info("Consumer 2 received: {}", msg);
int a = 1 / 0; channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { channel.basicNack(message.getMessageProperties().getDeliveryTag(), false.true); } } Copy the code
However, the problem is that the business code will not be fixed 99.9% of the time once there is a bug. A message will be sent to the queue indefinitely, and the consumer side will execute indefinitely, leading to an endless loop.
The local CPU was suddenly full, so you can imagine my panic when the service crashed in the production environment.
andrabbitmq management
There’s only one unconfirmed message.
Test analysis shows that when a message is reposted to a message queue, it does not return to the end of the queue, but remains at the head of the queue.
The consumer immediately consumes the message, the business process throws an exception, the message is re-queued, and so on. The message queue processing is blocked, causing normal messages to fail to run.
At that time, our solution was to reply the message first, and then the message queue would delete the message. At the same time, we sent the message to the message queue again, and the abnormal message was put at the end of the message queue, which not only ensured that the message would not be lost, but also ensured the normal business.
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// Resend the message to the end of the queue
channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,
JSON.toJSONBytes(msg));
Copy the code
However, this method does not solve the fundamental problem, and error messages are still reported from time to time. Later, the number of message retries is optimized and set. After reaching the retry upper limit, manual confirmation is made, the message is deleted from the queue, and the message is persisted into MySQL and pushed to the alarm, and manual processing and scheduled tasks are performed to compensate.
3. Repeated consumption
Depending on the business, the consumption of MQ can be idempotent. Messages can be persisted with MySQL or Redis and verified by unique properties in messages.
Demo making address https://github.com/chengxy-nds/Springboot-Notebook/tree/master/springboot-rabbitmq-confirm
Original is not easy, burning hair output content, if there is a lost harvest, a point to encourage it!
I sorted out hundreds of technical e-books and gave them to my friends. Pay attention to the public number reply [666] to get yourself. I set up a technology exchange group with some friends to discuss technology and share technical information, aiming to learn and progress together. If you are interested, just scan code and join us!
This article is formatted using MDNICE