This is the 8th day of my participation in the November Gwen Challenge. Check out the event details: The last Gwen Challenge 2021
preface
In the first few articles, I have looked at the components and basic usage of RabbitMQ, and made a full Demo of RabbitMQ integration through the SpringBoot project. In general, with the introduction of new middleware, there is another layer of data risk to consider, so how does Rabbitmq’s message know if it is being consumed? In this article, we will demonstrate how producers can ensure that they send successfully.
First, why do we need to confirm the message?
In MQ, the consumer and producer do not communicate directly, the producer is only responsible for sending messages to the queue, and the consumer is only responsible for fetching messages from the queue (whether push or pull).
-
After the consumer obtains the message from the queue, the message does not exist in the queue. However, if the channel where the consumer is located fails to consume the message due to network interruption, the message will be lost forever. Therefore, we hope to delete the message after the consumer successfully consumes the message.
-
The same is true when sending a message. The producer sends a message to the switch, but there is no guarantee that the message will be sent correctly.
This mechanism is the message confirmation mechanism.
Ii. Message confirmation process
In the flowchart, we can see that message validation is divided into producer validation and consumer validation.
Both of these mechanisms are inspired by the TCP protocol and are important for data security.
Supplement:
- There are two transaction mechanisms in RabbitMQ to ensure the secure delivery of messages: transaction and acknowledgement. The transaction mechanism requires that the channels committed per message or per group of message publications be set up to be transactional, which is very performance costly and reduces message throughput. Therefore, a validation mechanism is usually used in practice.
Iii. Producer confirmation
The link from producer to consumer is producer -> broker -> exchange -> queue -> Consumer.
There are two options we can use to control the reliability of message delivery during coding:
- One message is returned if the message from producer to RabbitMQ broker cluster succeeds
confirmCallback
; - If a message fails to be delivered from exchange to Queue, one is returned
returnCallback
We can use these two callback interfaces to control message consistency and handle some exceptions.
3.1 Code Preparation
3.1.1 Configuration File
1. Add the following information to the configuration file:
spring:
rabbitmq
publisher-confirm-type:
Copy the code
It has three values:
- NONE: Disables publication confirmation mode. This is the default
- CORRELATED: a callback method is triggered when a message is successfully published to an exchange
- SIMPLE: Values are tested to have two effects. First, the effect is similar to that of CORRELATED values. Second, after messages are successfully published, waitforguarantees or waitForConfirmsOrDie are called using rabbitTemplate to wait for broker nodes to send back the results. To determine the next logical step, note that the waitForConfirmsOrDie method closes the channel if it returns false and then cannot send messages to the broker;
3.1.2 configuration class
Thinking: How to load and bind these configurations from the database at project startup, we’ll look at that later.
@configuration public class ConfirmConfig {// Switch public static final String confirM_exchange_name = "confirm_exchange"; Public static final String confirm_queue_name="confirm_queue"; // routingkey public static final String confirm_routing_key = "key1"; @bean ("confirmExchange") public DirectExchange confirmExchange(){return new DirectExchange(confirm_exchange_name); } @bean ("confirmQueue") public Queue confirmQueue() {return queueBuilder.durable (confirm_queue_name).build(); } @bean public Binding queueBingExchange(){return BindingBuilder.bind(confirmQueue()).to(confirmExchange()).with(confirm_routing_key); }}Copy the code
3.1.3 Callback Interface
@Slf4j @Component public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; /** * Class that failed to inject a callback to rabbitTemplate * post-handler: execute only after all other annotations have been executed. */ @PostConstruct public void init(){ rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnCallback(this); @param correlationData: Saves the ID and related information of the callback message. Ack =true indicates that the switch has received the callback message. Ack =false Indicates failure. * @param ACK: true indicates that the switch has received. * @param cause: */ @override public void confirm(CorrelationData CorrelationData, Boolean ack, String cause) {if(ack){log.info(" The switch has received the message with ID: {} ", correlationdata.getid ()); }else{log.info(" Switch did not receive a message with ID {}, because {}", correlationdata.getid (),cause); }} /** * Returns the message to the producer * @param Message * @param replyCode * @param replyText * @param exchange * @param when the message is unreachable during delivery to the queue routingKey */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log. The error (" message {}, {} by switches back reasons {} ", message, exchange, replyText); }}Copy the code
3.2 Switch validation — Production and consumption test
Using switch callbacks, publisher-confirm-type is configured to be CORRELATED
@GetMapping("/sendMsg/{message}") public void sendConfirmMsg(@PathVariable String message){ rabbitTemplate.convertAndSend(ConfirmConfig.confirm_exchange_name+'1', ConfirmConfig.confirm_routing_key,message,new CorrelationData("1")); Log.info (" Send message content: {}",message); }Copy the code
The false code goes into the callback function when the producer can’t get the message.
Interface ConfirmCallback, rewrite confirm() method, method has three parameters correlationData, ACK, cause.
-
CorrelationData: The object has only one ID attribute, which indicates the uniqueness of the current message.
-
Ack: The state in which the message was delivered to the broker. True indicates success.
-
Cause: Indicates the cause of delivery failure.
3.1.5 Queue Confirmation: Rollback the interface
After receiving a message, the switch can determine whether the current path is correct, but it cannot guarantee that the message can be sent to the routing queue. The sender does not know whether the message has been sent to the queue, so we need to confirm the message in the queue. This is the fallback message.
Implement interface ReturnCallback, rewrite the returnedMessage() method, The method takes five parameters: Message (message body), replyCode (response code), replyText (response content), Exchange, and routingKey.
Add a note:
publisher-returns: true
Copy the code
Execution code:
rabbitTemplate.convertAndSend(ConfirmConfig.confirm_exchange_name, ConfirmConfig.confirm_routing_key,message+"1",new CorrelationData("1"));
rabbitTemplate.convertAndSend(ConfirmConfig.confirm_exchange_name, ConfirmConfig.confirm_routing_key+"2",message+"2",new CorrelationData("2"));
Copy the code
Back to perform
Fetch (Body:'hello22' MessageProperties [headers={spring_correlation =2}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, DeliveryTag =0]), cause NO_ROUTE returned by confirm_exchange switchCopy the code
Four, consumer confirmation
Rabbitmq can consume messages in two modes: push and pull.
- Push pattern: The messaging middleware actively pushes messages to consumers
- Pull pattern: Consumers actively pull messages from the messaging middleware.
However, in practice, pulling messages will reduce the throughput of the system, and it is difficult for consumers to get messages in real time. Therefore, push mode is generally used.
Instead of pushing a message to a consumer after the consumer has consumed one, MQ determines that multiple messages can be pushed to the consumer’s cache based on the preFETch_count parameter.
In consumer validation, RabbitMQ supports message confirmation ACK to ensure that data is not lost. The ACK mechanism is for the consumer to receive and process a message from RabbitMQ and return it to RabbitMQ, which will then remove the message from the queue.
4.1 Automatic Confirmation
Automatic confirmation is when a consumer consumes a message, and when the consumer receives the message, it is removed from the queue by RabbitMQ. This model says “send is successful”. This is not safe because the consumer may interrupt the business without successfully completing the purchase.
Let’s test this logic with debug. You can see that in debug, there is no data in the queue before the consumer has consumed it.
4.2 Manually Confirming autoAck: false
Manual confirmation is divided into positive confirmation and negative confirmation.
4.2.1 Confirm BasicAck
// false indicates that only b. deleverttag is acknowledged. BasicAck (b.getenvelope ().getDeliveryTag(),false); true: confirm all messages less than or equal to B. delievertTag (batch confirm) channel.basicack (b.getenvelope ().getDeliveryTag(),false);Copy the code
When the consumer has finished consuming the data, there are 18 messages left in the queue and one message waiting to be confirmed.
4.2.2 BasicNack, BasicReject
There are few negative confirmations, but sometimes they are needed when a consumer cannot process a message immediately for some reason.
To deny validation, you need to specify whether to drop the message, requeue the message and come back later, or requeue the message and have it received and processed by another consumer as soon as possible.
Discard: Requeue: false: channel.BasicNack(deliveryTag: e.deliveryTag, multiple: false, Requeue: false);Copy the code
Requeue (Requeue: true) : channel.BasicNack(deliveryTag: e.deliveryTag, multiple: false, Requeue: true);Copy the code
In general, if an exception occurs, channel.BasicNack is used to put the failed consumption message back on the queue.
4.3 Confirming the Springboot version
The Springboot confirmation modes are as follows:
spring.rabbitmq.listener.simple.acknowledge-mode=manual
Copy the code
- NONE: not confirmed:
- 1. By default, all message consumption is successful, and messages will be continuously pushed to consumers
- 2. Because RabbitMQ assumes that all messages have been consumed successfully. So there is a risk of message loss in the queue.
- AUTO: automatic confirmation
- 1. According to whether the message processing logic throws an exception, it automatically sends ACK (normal) and NACK (abnormal) to the server. If the consumer’s logic does not handle this data well, there is a risk of message loss.
- 2. Another thing to consider when using auto-confirm is consumer overload.
- “MANUAL” : MANUAL confirmation
- 1. Manual confirmation When a service fails, a consumer invokes ACK, NACK, and Reject to confirm the message. If the message is not ack, it is sent to the next consumer or requeued.
- 2. Ack is used for positive confirmation. Nack is used for negative confirmation; Reject Used to deny confirmation (only one message can be rejected at a time)
@Component @Slf4j public class MsgConfirmController { @RabbitListener(queues = ConfirmConfig.confirm_queue_name) public void consumerConfirm(Message message, Channel channel) throws IOException { if(message.getBody().equals("2")){ channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); //basicAck: a successful acknowledgement, with which the message will be deleted by rabbitMQ Broker. Log.info (" received message :{}",message); }else{ channel.basicReject(message.getMessageProperties().getDeliveryTag(),true); Log.info (" unconsumed data "); }}}Copy the code
It finds that it can send messages back to the queue, and then it consumes that data again and again, creating an endless loop of messages being delivered indefinitely.
In this case, you can set the number of retries. When the number of retries fails, the message is queued or lost.
conclusion
This paper mainly tests the springBoot microservice version of message validation, and verifies the feasibility of rabbitMQ message validation by calling each other between two services. We will delve into rabbitMQ in more detail in a later article.