This is the fourth shell of the message queue RabbitMQ.
RabbitMQ I have written three articles, basic messaging and basic concepts I have written, learn anything is the same, the first use of the basic, and then encounter problems to solve, unable to understand the source code, with the accumulation of time to this technology understanding will also improve.
Today I’ll list some of the more advanced uses of RabbitMQ, some of which are useful and some of which are useless, but be sure to know something about them, because most of us are learning for interviews
How to ensure the reliability of messages?
How does message queuing limit traffic?
How to set delay queue for delay consumption?
I wish you a good harvest, praise first look, happiness is infinite.
This article code: GitHub address
Gitee.com/he-erduo/sp…
Github.com/he-erduo/sp…
1. How does 📖 ensure message reliability?
Let’s take a look at our Wanyo chart, from which we can probably see that a message will go through four nodes, and the reliability of the whole system can only be guaranteed by ensuring the reliability of these four nodes.
The producer sends a guarantee to reach MQ.
MQ receives the message and is guaranteed to be distributed to the corresponding Exchange.
Exchange guarantees the persistence of messages after they are distributed and enqueued.
After receiving the message, the consumer guarantees the correct consumption of the message.
Through these four guarantees, we can guarantee the reliability of the message and therefore guarantee that the message will not be lost.
2. 🔍 The producer fails to send the message to MQ
After our producer sends the message, our message may not be sent to MQ due to various reasons such as network intermittent outage, but at this time, our production end does not know that our message is not sent, which will cause the loss of the message.
To solve this problem RabbitMQ has introduced transactions and publisher confirmations, which are usually not used because transactions are too performance intensive, but I’ll focus on publisher confirmations here.
This mechanism is easy to understand. After a message is sent to MQ, MQ will send us an acknowledgement message.
Enabling this feature requires configuration, so let me demonstrate the configuration:
spring:
rabbitmq:
Addresses: 127.0.0.1
host: 5672
username: guest
password: guest
virtual-host: /
The message confirmation mechanism is enabled
publisher-confirm-type: correlated
We just need to open the message confirmation in the configuration.
Producers:
public voidsendAndConfirm() {
User user = new User();
log.info(“Message content : “+ user);
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(Producer.QUEUE_NAME,user,correlationData);
Log.info (” Message sent completed.” );
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info(“CorrelationData content : “+ correlationData);
log.info(“Ack status : “+ ack);
log.info(“Cause content : “+ cause);
if(ack){
Log.info (” message sent successfully, order stored, order status changed “);
}else{
Log.info (” Message sending failed: “+correlationData+”, abnormal: “+cause);
}
}
});
}
In the producer code we see an additional parameter: CorrelationData is a unique identifier for the message. SetConfirmCallback is required for rabbitTemplate to confirm the message. The parameter is an anonymous class. This anonymous class is where we write our confirmation or failure messages.
For example, an order message can be entered into the database or the node status of the order can be changed after the message confirmation reaches MQ. If the message fails to reach MQ, the order status can be recorded or modified.
Tip: Message confirmation failure is triggered not only when the message has not been sent, but also when the message has been sent but the corresponding Exchange cannot be found.
3. 📔MQ receiving fails or routing fails
With the producer sending message processed, we can take a look at processing on the MQ side, where two problems can occur:
The message could not find the corresponding Exchange.
An Exchange was found but no Queue was found.
Both cases can be resolved using the Mandatory parameter provided by RabbitMQ, which sets the policy for message delivery failure. There are two policies: automatic deletion or return to the client.
Since we want to do reliability, of course, is set to return to the client (true is to return to the client, false is automatically deleted).
Configuration:
spring:
rabbitmq:
Addresses: 127.0.0.1
host: 5672
username: guest
password: guest
virtual-host: /
The message confirmation mechanism is enabled
publisher-confirm-type: correlated
Open message return
publisher-returns:true
template:
mandatory:true
All you need to do is enable message return in the configuration, template.mandatory: true
Producers:
public voidsendAndReturn() {
User user = new User();
log.info(“Message content : “+ user);
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
Log.info (” returned message: {}”, message);
The info (” replyCode: {} “, replyCode);
The info (” replyText: {} “, replyText);
The info (” exchange: {} “, exchange);
The info (” routingKey: {} “, routingKey);
});
rabbitTemplate.convertAndSend(“fail”,user);
Log.info (” Message sent completed.” );
}
Here we can take all the information about the returned message and process it, such as putting it on a new queue and processing it separately. Routing failure is usually a configuration problem.
4. MQ breaks down after 📑 message is queued
At this point, it’s mostly a matter of low probability, such as when MQ suddenly goes down or is shut down, that messages must be persisted so that they can be recovered after MQ restarts.
Message persistence needs to be done, but not just message persistence, but queue persistence and Exchange persistence.
@Bean
public DirectExchangedirectExchange() {
// Three construction parameters: Name durable autoDelete
returnnew DirectExchange(“directExchange”,false,false);
}
@Bean
public Queueerduo() {
// Its three parameters: durable exclusive autoDelete
// Just set persistence
returnnew Queue(“erduo”,true);
}
As long as persistence is set up when exchanges and queues are created, the messages sent are persistent messages by default.
Make sure that both Exchange and queues are set to persist:
Simply setting Exchange persistence will cause the queue to be lost after a restart. Simply setting persistence on the queue will make the Exchange disappear after the restart and the messages will be lost, so it is meaningless to set persistence separately.
These are all problems caused by MQ outages. If the server goes down or the disk is corrupted, none of the above methods will work. Mirroring queues must be introduced and remote multitasking must be done to combat this overwhelming factor.
5. 📌 consumers cannot consume normally
The final step where the problem arises is on the consumer side, but the solution to this problem, which we’ve already covered in previous articles, is consumer confirmation.
spring:
rabbitmq:
Addresses: 127.0.0.1
host: 5672
username: guest
password: guest
virtual-host: /
Manual acknowledgement message
listener:
simple:
acknowledge-mode: manual
After turning on manual message confirmation, as long as this message is not consumed successfully, no matter whether there is consumer outage or code exception in the middle, as long as this message has not been consumed after the connection is disconnected, this message will be put back into the queue and consumed again.
Of course, repeated consumption can occur, but in distributed systems idempotency is required, so repeated consumption is generally blocked by the idempotency of the interface.
Idempotence means that the result of an operation executed many times is the same as the result of one execution.
Idempotence is beyond the scope of this chapter, so I won’t elaborate on it.
6. 💡 Message reliability case
I drew this graph a long time ago to document the way in which RabbitMQ was used for message reliability, so I’m going to show you an example.
In this example, the message is stored first, then the producer takes the data from the DB, wraps it into a message and sends it to MQ, changes the state of the DB data after it is consumed by the consumer, and then re-stores it.
If any of these steps fail, the state of the data is not updated. In this case, a timed task is constantly scoured through the library to find the problematic data and throw it back to the producer for re-delivery.
In fact, this scheme is much the same as many schemes on the Internet. After the basic reliability is guaranteed, the scheduled task is a bottom-pocket for continuous scanning, trying to achieve 100% reliability.