1. Analysis of message reliability delivery
In the process of sending RabbitMQ messages, we can see that in order to ensure the reliability of the message delivery, the following four steps must be taken to ensure the reliability of the message
- The Producer sends messages to the Broker.
- Messages are routed from an Exchange to a Queue.
- Messages are stored persistently in queues.
- Consumers subscribe to and consume messages.
According to the above four links, we will analyze one by one.
1.1. Producers send messages to brokers
In the process of sending a message to the Broker, if the Broker fails to receive the message due to network or disk problems, how does the Producer know whether the message was sent to the Broker?
RabbitMQ provides an acknowledgement mechanism for the Producer to receive a message from the Broker before the message is successfully received. There are two kinds of confirmation mechanisms:
- **Transaction mode **
- Confirm mode
1.1.1 Transaction mode
Here is a transaction and I normally understand the transaction, in fact, the same reason, in the case of failure in the sending of the message, the message will be rolled back.
By capturing packets using WireShark, we can know. When using the transaction pattern, the interaction between the Producer and Broker is as follows:
The Java API is as follows:
Code sample
try {
channel.txSelect();// Start transaction mode
channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes());
channel.txCommit();/ / submit
System.out.println("Message sent successfully");
} catch (Exception e) {
channel.txRollback();/ / rollback
System.out.println("Message has been rolled back.");
}
Copy the code
Springboot is as follows:
Configure it on TemplateConfig
Code sample
rabbitTemplate.setChannelTransacted(true) :Copy the code
In transaction mode, the Commit succeeds only when the Commit OK directive from the server is received. So the problem of producer and server validation can be solved. However, the transaction mode has a disadvantage, it is blocked, one message is not sent, the next message can not be sent, it can drain the performance of the RabbitMQ server. Therefore, it is not recommended for use in production environments.
1.1.2 Confirm mode
1.1.2.1. Common confirmation mode
ConfirmSelect () turns on the confirmation mode, the Broker will send a basic. Ack receiving messages, the Producer receives receipt by channel.waitforconfirmConfirm () acknowledge receiving.
Code sample
// Enable the sender acknowledgement mode
channel.confirmSelect();
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
// Common Confirm, send one, Confirm one
if (channel.waitForConfirms()) {
System.out.println("Message sent successfully" );
}else{
System.out.println("Message sending failed");
}
Copy the code
1.1.2.2 Batch confirmation mode
Hannel. ConfirmSelect (confirm) open mode, the Broker to receive all the news will give a Basic Ack, through the channel. The Producer side waitForConfirmsOrDie () a receipt. As long as waitForConfirmsOrDie does not throw an exception, the server receives successfully. This approach improves efficiency. However, if there is a problem with one of the messages sent in bulk, all the messages will not be sent successfully.
Code sample
try {
channel.confirmSelect();
for (int i = 0; i < 5; i++) {
// Send the message
// String exchange, String routingKey, BasicProperties props, byte[] body
channel.basicPublish("", QUEUE_NAME, null, (msg +"-"+ i).getBytes());
}
If Multiple=True, all the messages before the delivery-tag in the ACK are confirmed
// For example, 5 messages may receive only 1 ACK, or 2 ACK.
// Until all messages are published, there will be an IOException if one is not confirmed by the Broker
channel.waitForConfirmsOrDie();
System.out.println("Message sent, batch confirmation successful.");
} catch (Exception e) {
// An exception has occurred and all messages may need to be resended
e.printStackTrace();
}
Copy the code
1.1.2.3 Asynchronous confirmation mode
Asynchronous acknowledgement, as the name suggests, means that sending and acknowledging messages are not synchronous. This allows you to acknowledge the message as you send it. ConfirmSelect: confirmSelect: ConfirmListener: ConfirmListener: ConfirmListener: ConfirmListener: ConfirmListener: ConfirmListener: ConfirmListener: ConfirmListener: ConfirmListener: ConfirmListener: ConfirmListener The handleAck method is a callback to the message that the server has acknowledged. The handleNack method is served as a callback for the acknowledgement message.
Code sample
// The deliveryTag used to maintain unacknowledged messages
final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
// This will not print all ACK responses; There may be more than one ACK, and it may be multiple or one ACK at a time
// Listen asynchronously for acknowledged and unacknowledged messages
// To repeat, stop the previous producer and clear the queue
channel.addConfirmListener(new ConfirmListener() {
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("Broker unacknowledged message, identified:" + deliveryTag);
if (multiple) {
// headSet Delete all elements before the following parameters
confirmSet.headSet(deliveryTag + 1L).clear();
} else {
confirmSet.remove(deliveryTag);
}
// Here is the method of adding weights
}
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
// If true, all messages (smaller than the deliveryTag) before the value of deliveryTag were batched, and if false, a single acknowledgement
System.out.println(String.format("Broker confirms messages, id: %d, multiple messages: %b", deliveryTag, multiple));
if (multiple) {
// headSet Delete all elements before the following parameters
confirmSet.headSet(deliveryTag + 1L).clear();
} else {
// Remove only one element
confirmSet.remove(deliveryTag);
}
System.out.println("Unconfirmed information :"+confirmSet); }});// Enable the sender acknowledgement mode
channel.confirmSelect();
for (int i = 0; i < 10; i++) {
long nextSeqNo = channel.getNextPublishSeqNo();
// Send the message
// String exchange, String routingKey, BasicProperties props, byte[] body
channel.basicPublish("", QUEUE_NAME, null, (msg +"-"+ i).getBytes());
confirmSet.add(nextSeqNo);
}
System.out.println("All the news :"+confirmSet);
// If it is closed first, you may not receive the next ACK
//channel.close();
//conn.close();
Copy the code
Springboot example:
Set it up in TemplateConfig. Code sample
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(! ack) { System.out.println("Sending message failed:" + cause);
throw new RuntimeException("Sending exception:"+ cause); }}});Copy the code
Messages are routed from Exchange to Queue
There was a problem routing messages from Exchange to Queue. There are two possible scenarios. Routing key error. The or queue does not exist.
RabbitMQ provides two ways to handle this:
- The Broker is sent back to the Producer (via callback set by the Producer).
- Route the switch to the backup switch.
1.2.1 The Broker resends to the Producer
Java API, by setting the ReturnListener to achieve the callback.
Code sample
channel.addReturnListener(new ReturnListener() {
public void handleReturn(int replyCode,
String replyText,
String exchange,
String routingKey,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
System.out.println("========= listener received unroutable message returned ============");
System.out.println("replyText:"+replyText);
System.out.println("exchange:"+exchange);
System.out.println("routingKey:"+routingKey);
System.out.println("message:"+newString(body)); }});Copy the code
Used in SpringBoot, set in TemplateConfig, and implement the callback by setting ReturnCallback.
Code sample
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback(){
public void returnedMessage(Message message,
int replyCode,
String replyText,
String exchange,
String routingKey){
System.out.println("Returned message:");
System.out.println("replyCode: "+replyCode);
System.out.println("replyText: "+replyText);
System.out.println("exchange: "+exchange);
System.out.println("routingKey: "+routingKey); }});Copy the code
1.2.2 Switch Routing to backup switch
The alternate switch is specified by setting the alternate-exchange parameter when the switch is declared.
Code sample
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2).
contentEncoding("UTF-8").build();
// Backup switch
channel.exchangeDeclare("ALTERNATE_EXCHANGE"."topic".false.false.false.null);
channel.queueDeclare("ALTERNATE_QUEUE".false.false.false.null);
channel.queueBind("ALTERNATE_QUEUE"."ALTERNATE_EXCHANGE"."#");
// Specify the backup switch when declaring the switch
Map<String,Object> arguments = new HashMap<String,Object>();
arguments.put("alternate-exchange"."ALTERNATE_EXCHANGE");
channel.exchangeDeclare("TEST_EXCHANGE"."topic".false.false.false, arguments);
Copy the code
1.3 persistent storage of messages in Queue
Here we will learn some Settings for message store persistence.
1.3.1 Queue persistence
Declare the queue as durable=true.
// String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
channel.queueDeclare(QUEUE_NAME, false.false.false.null);
Copy the code
Durable: Queues that do not persist. They are stored in memory and disappear when the service is restarted.
AutoDelete: Automatically delete when no consumer is connected.
The characteristics of an exclusive queue are:
- Visible only to the Connection that first declared it.
- Will be automatically deleted when the connection is disconnected.
1.3.2 Switch persistence
Again, set the following parameters to achieve persistence. durable
// Declare the switch
// String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments
channel.exchangeDeclare(EXCHANGE_NAME,"direct".false.false.null);
Copy the code
1.3.3 message persistence
The message is persisted by setting deliveryMode=2 in the BasicProperties construct.
// Set the expiration time for each message
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2) // Persist the message
.contentEncoding("UTF-8")
.expiration("10000") // TTL
.build();
Copy the code
1.3.4, cluster
Clustering improves the availability of RabbitMQ by using backup.
1.4 Consumer news
If the Consumer receives the message and an exception occurs during processing, the Comsumer fails to consume the message. In this case, what to do?
Don’t panic, RabbitMQ provides us with a Consumer confirmation mechanism. After Comsumer receives and processes the message, it either manually or automatically sends an ACK to the server.
No ACK message is received. When the consumer is disconnected, RabbitMQ sends the message to the other consumer. If there are no other consumers, the consumer reconsumes the message and repeats the business logic (or better yet, if the code is fixed).
There are two ways consumers can determine how to receive messages:
- Automatic acknowledgement (ACK)
- Manual confirmation (ACK)
1.4.1 Automatic ACK
Automatic ACK, which is also the default. Instead of writing an ACK code at the consumer, the consumer will automatically send an ACK when it receives a message, rather than when the method finishes (regardless of whether you have a normal message).
1.4.2 Manual ACK
Validation can be implemented after the processing of the consumer business logic is complete.
1. Java API usage
// Set autoAck to false to declare automatic acknowledgment
// String queue, boolean autoAck, Consumer callback
channel.basicConsume(QUEUE_NAME, false, consumer);
// Next, in the Consumer's callback, call the channel's following methods to implement message confirmation, rejection, and exception handling
if (msg.contains("No." ")) {// Reject the message
// Requeue: indicates whether to requeue. True: yes. False: directly discard, indicating that the queue can be deleted directly
// TODO if there is only one consumer, requeue true will result in repeated consumption of messages
channel.basicReject(envelope.getDeliveryTag(), false);
} else if (msg.contains("Abnormal")) {// Batch reject
// Requeue: indicates whether to requeue
// TODO if there is only one consumer, requeue true will result in repeated consumption of messages
channel.basicNack(envelope.getDeliveryTag(), true.false);
} else {
// Manually reply
// If you do not respond, the message will remain in the queue and will be reused when reconnecting
channel.basicAck(envelope.getDeliveryTag(), true);
}
Copy the code
Code sample
2. Use springBoot
First, in the application.properties file, do the following configuration
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual
Copy the code
Note that there are three options to choose from
none
Automatic ACK:manual
Manual ACKauto
: Sends an ACK if the method does not throw an exception. If the method throws an exception and is notAmqpRejectAndDontRequeueException
The nACK is sent and re-enqueued. If you throw an exceptionAmqpRejectAndDontRequeueException
The nACK is not requeued.
Then confirm in the following consumption method
@RabbitListener(queues = "${com.fanger.secondqueue}", containerFactory="rabbitListenerContainerFactory")
public class SecondConsumer {
@RabbitHandler
public void process(String msgContent,Channel channel, Message message) throws IOException {
System.out.println("Second Queue received msg : " + msgContent );
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); / / confirm
// channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,false); // Batch reject
// channel.basicReject(message.getMessageProperties().getDeliveryTag(),false); / / deny}}Copy the code
1.5. Consumer callback
To handle the above four steps to ensure reliable delivery of the message, we can also ensure the reliability of the message by generating the receipt after the consumer has executed successfully. Here the producer can provide a response API to receive notifications.
1.6 Message compensation mechanism
If, for a variety of reasons, the consumer does not call back to the producer API. What if the message is not processed successfully?
At this point, we need to compensate for the message and resend the message.
You can do this with a status table of messages, you can do this with a scheduled task, you can scan the table, you can resend the message. The thing to notice here is
- The number of resends?
- The time interval for resending the message?
These two problems, depending on the system design, but do not try more than 3 times.
1.7 Idempotence of messages
If you have a message compensation mechanism in 1.6, you must ensure that messages are idempotent.
Possible reasons for possible duplication:
- The problem of the producer is that the link (1) sends the message repeatedly. For example, when the Confirm mode is enabled but no confirmation is received, the consumer sends the message repeatedly.
- There is a problem in step 4, because the consumer has not sent an ACK or other reasons, the message is repeated consumption.
- Producer code or network problem.
How do you avoid repeated consumption of messages?
Duplicate messages can be controlled by generating a unique business ID for each message and then reweighting when it is dropped from the library.
1.8 Sequence of messages
Sequentiality means that the order in which consumers consume messages is the same as the order in which producers produce them.
In RabbitMQ, when there are more than one consumer in a queue, the order cannot be guaranteed because different consumers consume messages at different rates. Sequential consumption (different business messages sent to different dedicated queues) is guaranteed only in the case of one consumer per queue.