instructions
The previous article, we learned how to ensure reliable message is delivered to the RabbitMQ switches, but there are some not perfect place, just imagine, if you send a message to the RabbitMQ server, the server is also received this message, then returned to the ACK confirmation message to you, but the server to the message, Couldn’t find a queue to route it, so it was thrown in the trash, EMMM, which I guess is recyclable.
How do I get messages reliably delivered to queues
If the above description is not clear to you, let me explain it again in code.
When only the producer confirmation mechanism is enabled, the switch directly sends an acknowledgement message to the message producer after receiving the message. If the message is found to be unroutable, the message is discarded. In this case, the producer does not know the event.
We changed the switch type from the previous article to DirectExchange so that the message is actually routed only if the RoutingKey is the same as the Bindingkey (” key “in this case) that was set when the queue was bound.
public static final String BUSINESS_EXCHANGE_NAME = "rabbitmq.tx.demo.simple.business.exchange";
public static final String BUSINESS_QUEUEA_NAME = "rabbitmq.tx.demo.simple.business.queue";
// Declare the business Exchange
@Bean("businessExchange")
public DirectExchange businessExchange(a){
return new DirectExchange(BUSINESS_EXCHANGE_NAME);
}
// Declare a business queue
@Bean("businessQueue")
public Queue businessQueue(a){
return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).build();
}
// Declare a service queue binding
@Bean
public Binding businessBinding(@Qualifier("businessQueue") Queue queue,
@Qualifier("businessExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("key");
}
Copy the code
The message producer is also slightly modified:
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
private void init(a) {
// rabbitTemplate.setChannelTransacted(true);
rabbitTemplate.setConfirmCallback(this);
}
public void sendCustomMsg(String exchange, String msg) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
log.info(Message id:{}, MSG :{}", correlationData.getId(), msg);
rabbitTemplate.convertAndSend(exchange, "key", msg, correlationData);
correlationData = new CorrelationData(UUID.randomUUID().toString());
log.info(Message id:{}, MSG :{}", correlationData.getId(), msg);
rabbitTemplate.convertAndSend(exchange, "key2", msg, correlationData);
}
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) { String id = correlationData ! =null ? correlationData.getId() : "";
if (b) {
log.info("Message confirmed successful, ID :{}", id);
} else {
log.error("Message not delivered successfully, id:{}, cause:{}", id, s); }}Copy the code
We then call this method and test it by sending two messages:
Message id: ba6bf502 -9381-4220-8dc9-313d6a289a4e, msg:1Message id: f0040a41 dc02 -4e45-b8af-e3cfa8a118b2, msg:1Message confirmed successfully. Id: BA6BF502 -9381-4220-8dc9-313D6a289a4e message confirmed successfully. Id: F0040A41-DC02 -4e45- B8AF-e3CFA8A118B2 Received a service message:1
Copy the code
As you can see, two messages were sent, the first with a RoutingKey of “Key” and the second with a RoutingKey of “key2”. Both messages were successfully received by the switch and received an acknowledgement callback from the switch, but the consumer only received one message. Because the RoutingKey of the second message did not match the BindingKey of the queue, and no other queue could receive the message, all the second messages were discarded.
So how do you route a message to a queue and then return an ACK? Or a message that can’t be routed, help me figure out how to deal with it? At least let me know so I can handle it myself.
Don’t panic, there are two mechanisms in RabbitMQ that solve the above question:
1. Mandatory parameter 2. Backup switch
Mandatory parameters
The MANDATORY parameter can be set to return a message to the producer if the destination is not reachable during message delivery.
When the mandotory parameter is set to true, the switch returns the message to the producer if it cannot route it, and if the mandotory parameter is set to false, the message is discarded if it cannot be routed.
So how do you set this parameter? When sending a message, just add a line of code to the initialization method:
rabbitTemplate.setMandatory(true);
Copy the code
Once enabled, we can rerun the previous code:
Message id:19729f33 -15c4-4c1b-8d48-044c301e2a8e, msg:1Message id:4aea5c57-3e71-4a7b-8a00-1595d2b568eb, msg:1Message confirmed successfully, id:19729f33 -15c4-4c1b-8d48-044C301e2a8e Returned Message but no callback available Message was Returned successfully. Id:4aea5c57-3e71-4a7b-8a00-1595D2b568eb receives business message:1
Copy the code
“Returned message but no callback available”.
As mentioned above, if a message cannot be routed, it is returned to the producer via a callback. Therefore, the producer must set a callback function to accept the message.
For correction, we need to implement an interface RabbitTemplate. ReturnCallback.
@Slf4j
@Component
public class BusinessMsgProducer implements RabbitTemplate.ConfirmCallback.RabbitTemplate.ReturnCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
private void init(a) {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(this);
}
public void sendCustomMsg(String exchange, String msg) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
log.info(Message id:{}, MSG :{}", correlationData.getId(), msg);
rabbitTemplate.convertAndSend(exchange, "key", msg, correlationData);
correlationData = new CorrelationData(UUID.randomUUID().toString());
log.info(Message id:{}, MSG :{}", correlationData.getId(), msg);
rabbitTemplate.convertAndSend(exchange, "key2", msg, correlationData);
}
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) { String id = correlationData ! =null ? correlationData.getId() : "";
if (b) {
log.info("Message confirmed successful, ID :{}", id);
} else {
log.error("Message not delivered successfully, id:{}, cause:{}", id, s); }}@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("Message was returned by the server. msg:{}, replyCode:{}. replyText:{}, exchange:{}, routingKey :{}".newString(message.getBody()), replyCode, replyText, exchange, routingKey); }}Copy the code
Then we run it again:
Message id:2e5c336a-883a-474e-b40e-b6e3499088ef, msg:1Message id:85c771cb-c88f-47dd-adea-f0da57138423, msg:1Message confirmed successfully, id:2e5c336a-883a-474E-b40e-b6e3499088ef Message cannot be routed and is returned by the server. msg:1, replyCode:312ReplyText: NO_ROUTE, exchange: the rabbitmq. Tx. Demo. Simple. Business. Exchange, routingKey: key2 confirm successful message, id:85c771cb-c88f-47Dd-adea-f0da57138423 Received business message:1
Copy the code
As you can see, we received the returned message with the reason why the message was returned: NO_ROUTE. The mandatory parameter only lets producers know when a message cannot be routed. If producer validation is enabled, a message acknowledgement callback is performed on the switch when a message is received, regardless of whether the producer validation parameter has been set. And usually the message’s fallback callback precedes the message’s confirmation callback.
Backup switch
With the Mandatory parameter, we gain awareness of undeliverable messages anda chance to detect and process producer messages when they cannot be delivered. But sometimes we don’t know what to do with messages that can’t be routed. At best, we log them, trigger an alarm, and then handle them manually. Processing these unroutable messages through a log is inelegant, especially if the producer is working on a service with multiple machines, and manually copying the log is more cumbersome and error-prone.
Also, setting the MANDATORY parameter increases the complexity of the producer by adding logic to handle these returned messages. What if you don’t want to lose messages, but you don’t want to add complexity to the producer?
In the previous article on setting up a dead letter queue, we mentioned that you can set up a dead letter switch for a queue to store messages that fail to process, but these non-routable messages never have a chance to enter the queue, so you can’t use a dead letter queue to store messages.
Don’t panic, there is a mechanism for backing up switches in RabbitMQ to deal with this problem.
What is a backup switch? Backup switches can be understood as the RabbitMQ switches in the “spare tire”, when we as a switch statement, a corresponding backup switch is for it to create a spare tire, when the switch receives a routing message, will have to forward this message to the backup switch, by backup switches to make forwarding and processing, Usually the backup switch is of type Fanout, so that all messages are sent to the queue that it is bound to. Then we bind a queue to the backup switch, so that all messages that cannot be routed from the original switch are sent to the queue. Of course, we can also establish an alarm queue, with independent consumers to monitor and alarm.
Don’t understand? It doesn’t matter. Just take a look at the picture.
Emmm, adjusted the color scheme, still feel very ugly. Desperately need a UI to save me.
Next, let’s set up the backup switch:
@Configuration
public class RabbitMQConfig {
public static final String BUSINESS_EXCHANGE_NAME = "rabbitmq.backup.test.exchange";
public static final String BUSINESS_QUEUE_NAME = "rabbitmq.backup.test.queue";
public static final String BUSINESS_BACKUP_EXCHANGE_NAME = "rabbitmq.backup.test.backup-exchange";
public static final String BUSINESS_BACKUP_QUEUE_NAME = "rabbitmq.backup.test.backup-queue";
public static final String BUSINESS_BACKUP_WARNING_QUEUE_NAME = "rabbitmq.backup.test.backup-warning-queue";
// Declare the business Exchange
@Bean("businessExchange")
public DirectExchange businessExchange(a){
ExchangeBuilder exchangeBuilder = ExchangeBuilder.directExchange(BUSINESS_EXCHANGE_NAME)
.durable(true)
.withArgument("alternate-exchange", BUSINESS_BACKUP_EXCHANGE_NAME);
return (DirectExchange)exchangeBuilder.build();
}
// Declare a backup for Exchange
@Bean("backupExchange")
public FanoutExchange backupExchange(a){
ExchangeBuilder exchangeBuilder = ExchangeBuilder.fanoutExchange(BUSINESS_BACKUP_EXCHANGE_NAME)
.durable(true);
return (FanoutExchange)exchangeBuilder.build();
}
// Declare a business queue
@Bean("businessQueue")
public Queue businessQueue(a){
return QueueBuilder.durable(BUSINESS_QUEUE_NAME).build();
}
// Declare a service queue binding
@Bean
public Binding businessBinding(@Qualifier("businessQueue") Queue queue,
@Qualifier("businessExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("key");
}
// Declare a backup queue
@Bean("backupQueue")
public Queue backupQueue(a){
return QueueBuilder.durable(BUSINESS_BACKUP_QUEUE_NAME).build();
}
// Declare an alarm queue
@Bean("warningQueue")
public Queue warningQueue(a){
return QueueBuilder.durable(BUSINESS_BACKUP_WARNING_QUEUE_NAME).build();
}
// Declare a backup queue binding
@Bean
public Binding backupBinding(@Qualifier("backupQueue") Queue queue,
@Qualifier("backupExchange") FanoutExchange exchange){
return BindingBuilder.bind(queue).to(exchange);
}
// Declare a backup alarm queue binding
@Bean
public Binding backupWarningBinding(@Qualifier("warningQueue") Queue queue,
@Qualifier("backupExchange") FanoutExchange exchange){
returnBindingBuilder.bind(queue).to(exchange); }}Copy the code
Here we use ExchangeBuilder to create the switch and set up the backup switch for it:
.withArgument("alternate-exchange", BUSINESS_BACKUP_EXCHANGE_NAME);
Copy the code
One queue is bound to the service switch, and two queues are bound to the backup switch. One is used to store undeliverable messages for later manual processing, and the other is used for alarm.
Next, create consumers for the business switch and backup switch respectively:
@Slf4j
@Component
public class BusinessMsgConsumer {
@RabbitListener(queues = BUSINESS_QUEUE_NAME)
public void receiveMsg(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("Received business message: {}", msg);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false.false); }}Copy the code
@Slf4j
@Component
public class BusinessWaringConsumer {
@RabbitListener(queues = BUSINESS_BACKUP_WARNING_QUEUE_NAME)
public void receiveMsg(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.error("Unroutable message found: {}", msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }}Copy the code
Next we send a routable message and an unroutable message:
@Slf4j
@Component
public class BusinessMsgProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendCustomMsg(String exchange, String msg) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
log.info(Message id:{}, MSG :{}", correlationData.getId(), msg);
rabbitTemplate.convertAndSend(exchange, "key", msg, correlationData);
correlationData = new CorrelationData(UUID.randomUUID().toString());
log.info(Message id:{}, MSG :{}", correlationData.getId(), msg);
rabbitTemplate.convertAndSend(exchange, "key2", msg, correlationData); }}Copy the code
The message is as follows:
Message id:5c3a33c9-0764-4d1f-bf6a-a00d771dccb4, msg:1Message id:42ac8c35-1d0a-4413-a1df-c26a85435354, msg:1Received business message:1Unroutable message found:1
Copy the code
Here, only error log is used in conjunction with the log system to alarm. If sensitive data is collected, email, pin, SMS, telephone and other alarm methods can be used to improve timeliness.
Can the mandatory parameter be used with a backup switch? Setting mandatory causes the switch to return non-routable messages to the producer, while the backup switch forwards non-routable messages to the producer.
Emmm, why do you think so much? Just try.
Modify the producer:
@Slf4j
@Component
public class BusinessMsgProducer implements RabbitTemplate.ConfirmCallback.RabbitTemplate.ReturnCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
private void init(a) {
// rabbitTemplate.setChannelTransacted(true);
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(this);
}
public void sendCustomMsg(String exchange, String msg) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
log.info(Message id:{}, MSG :{}", correlationData.getId(), msg);
rabbitTemplate.convertAndSend(exchange, "key", msg, correlationData);
correlationData = new CorrelationData(UUID.randomUUID().toString());
log.info(Message id:{}, MSG :{}", correlationData.getId(), msg);
rabbitTemplate.convertAndSend(exchange, "key2", msg, correlationData);
}
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) { String id = correlationData ! =null ? correlationData.getId() : "";
if (b) {
log.info("Message confirmed successful, ID :{}", id);
} else {
log.error("Message not delivered successfully, id:{}, cause:{}", id, s); }}@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("Message was returned by the server. msg:{}, replyCode:{}. replyText:{}, exchange:{}, routingKey :{}".newString(message.getBody()), replyCode, replyText, exchange, routingKey); }}Copy the code
Let’s test it again:
Message id:0a3eca1e-d937-418c-a7ce-bfb8ce25fdd4, msg:1Message id: d8c9e010 e120 -46da-a42e-1ba21026ff06, msg:1Message confirmed successfully, id:0a3eca1e-d937-418C-a7ce-bfb8ce25fdd4 The message id is D8C9E010-E120 -46da-a42e-1Ba21026ff06 Found unroutable message:1Received business message:1
Copy the code
As you can see, both messages can receive an acknowledgement of success callback, but the non-routable message is not rolled back to the producer but forwarded directly to the backup switch. The backup switch has a higher processing priority.
conclusion
In the previous article, we introduced transaction and producer validation mechanisms to ensure reliable delivery of messages, which are more efficient and flexible. In this article, we introduced two additional mechanisms to ensure that messages to producers are not lost, namely, the mandatory parameter and the backup switch to handle non-routable messages.
Through these mechanisms, we can finally ensure that the message is delivered to its destination. At this point, our reliable delivery of information also came to an end. Reliable message delivery is one of the things we can’t escape from using MQ, and once we do it, we won’t be stuck with it again. In general, there are more ways than questions, but if you don’t know the ways, you may be overwhelmed when the questions come up.
After reading these RabbitMQ articles, your understanding of RabbitMQ has gone through the sky, so what are you waiting for? Optimize your RabbitMQ project so that you won’t be confused by the configuration and code.
So far, this article is perfect end, I hope to give you some inspiration, also welcome to pay attention to my public number for message exchange.