Solving RabbitMQ Message Loss and Ensuring Message Reliability (part 1)
Going back to the previous article, we know that messages go from the production side to the server side. What do we have to do to keep messages from getting lost?
- The sender uses Confirm mode. Notice that the Server fails to notify the sender, and the retransmission operation needs to be processed
- Persistent processing of messages
The preceding two operations ensure that the messages are not lost to the server, but are not in the high availability state. If the node is down and the service is temporarily unavailable, you need to restart the node, and the messages will not be lost because disk storage is available.
This article starts from the consumer side:
How can RabbitMQ Server keep messages from being lost to consumers?
As mentioned in the previous article, the consumer will go down before it can process the message because the message is automatically ack by default. RabbitMQ’s automatic ack mechanism will notify MQ Server that the message has been processed, and the message will be lost.
So we use manual ACK mechanism to solve this problem. The consumer notifies the MQ Server after processing the logic, so that the consumer will not send ACK before processing the message. If the consumer gets the message and hangs before processing it, the MQ cluster will automatically sense it. It automatically resends messages to other consumer service instances.
According to the above idea you need to complete the following two steps:
First: consumer listening setup manual ACK
this.channel = channelManager.getListenerChannel(namespace);
this.queue = queue;
this.channel.basicConsume(queue, false, consumerTag, this);
this.disconnectedCallback.setChannel(channel);
Copy the code
The core code: this channel. BasicConsume (queue, false, consumerTag, this); The second parameter setting false indicates no automatic ACK
Second: Manual ACK after service execution is complete
public static void ack(MessageContext context) {
long deliveryTag = context.getEnvelope().getDeliveryTag();
try {
context.getChannel().basicAck(deliveryTag, false);
} catch (IOException e) {
throw new MqAckException("Message ACK error: Connection exception or remote shutdown", context, e); }}Copy the code
Context.getchannel ().basicack (deliveryTag, false);
This encapsulates the need for the business to call the ack method of the object channel to notify MQServer after executing its business code, saying I am done and you can delete it.
Note the problem here: if you forget to call context.getChannel().basicack (deliveryTag, false);
Or maybe the code wasn’t executed because of a code exception? I’ll write another article about this later.
Messages stored in RabbitMQ Server are highly available
When we solve the problems of production and consumption, the basic guarantee of message loss problem, but there is also a high availability of messages, single node problems, common node problems will affect the temporary message unavailable, this time to use our HA mirror cluster mode to ensure.
As mentioned in the last article on RabbitMQ message loss and message reliability (I), the differences between the three modes of server message deployment will be focused on mirror mode.
In mirroring mode, at least three nodes, two disk nodes, and one memory node are required. Architecture diagram:
There are also some strategies for setting up mirrors:
- Syncing to all, which is generally not done, can have a significant impact on performance
- Synchronize a maximum of N machines
- Only nodes that match the specified name are synchronized
Rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]
- Mirror all nodes for each queue starting with “rock.wechat” and set to automatic synchronization mode
rabbitmqctl set_policy ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
rabbitmqctl set_policy -p rock ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
Copy the code
- Mirror two nodes for each queue starting with “rock.wechat.” and set to automatic synchronization mode
rabbitmqctl set_policy -p rock ha-exacly "^rock.wechat" \
'{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
Copy the code
- Mirrors each queue that starts with “node.
rabbitmqctl set_policy ha-nodes "^nodes\." \
'{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'
Copy the code
However, HA mirroring queues have a major disadvantage: throughput of the system can be reduced
Therefore, in mirroring mode, you need to customize voice processing according to specific service rules. In scenarios where message loss is not a problem and high performance is required, mirroring is not required.
conclusion
The explanation of the two articles analyzed the general idea of high availability of messaging middleware, without specific code details. If you have any questions, you can leave a comment below, AND I will reply and answer in time. I will gradually improve the relevant details later, welcome to pay more attention.
The following articles are planned to be updated:
- What can lead to repeated consumption and how can it be addressed?
- What real business scenarios need to ensure orderliness and how can messages be guaranteed orderliness?
- How to gracefully resolve retries of microservice indirect interface failures through message queues?
Recommended reading
Solving RabbitMQ Message Loss and Ensuring Message Reliability (part 1)
IntelliJ IDEA to improve the efficiency of development plug-ins necessary
END
If there is harvest, please help forward, there will be a better article contribution, your encouragement is the biggest power of the author!
Welcome to follow my official account: The Training of Architects, for exclusive learning resources and daily dry goods push.