Reliable message delivery scheme at production end
What is production-end reliability delivery
- Ensure the successful delivery of messages
- security
MQ
The node receives data successfully. - Sender receives
MQ
Node (Broker
) Acknowledgement (received) - Improve the message compensation mechanism
Reliability delivery scheme (without transactions)
1. The message is stored in the database (persisted to the database), and the message status is marked. If the message does not respond, the polling operation is performed.
-
Step1: store the business message and regenerate a message to the message DB for recording (for example, the message was created and is being sent status: 0). (Disadvantage: Persist database twice)
-
Step2: the production end sends messages.
-
Step3: after receiving the message, the Broker sends a reply to the production end. Confirm Listener Listens to the Broker asynchronously.
-
Step4: After the reply indicates that the message is delivered successfully, capture the specified message record in the message DB and update the status, such as status: 1
-
Step5: for example, if the network is unstable in Step3, the Listener does not receive a reply for successfully confirming the message. The status in the message database is still 0, and the Broker may be the state that received the message. So set a rule (timed task), such as extracting the record if it is still in the zero state after 5 minutes (timeout).
-
Step6: Redeliver
-
Step7: Limit the number of retries, such as 3. If more than 3 times, the delivery fails. Update the status value. (Use compensation mechanism to query the cause of message failure, manual)
2. Delayed delivery of messages, secondary confirmation, callback check. (High-concurrency scenario)
-
Upstream service: Production
-
Downstream Service: indicates the consumer end
-
Step1: after the business message is dropped, send the message to the Broker.
-
Step2: then send the second delay (set delay time) check message.
-
Step3: the consumer listens to the specified queue to receive the message for processing
-
Step4: after processing, generate a response message and send it to the Broker.
-
Step5: The Callback service listens to the response message, and persists the response message to the message DB (record the success status).
-
Step6: when the delay time arrives, the delayed message is also monitored by the listener of Callback service, and the message DB is checked. If no successful status is found, the Callback service compensates and initiates an RPC communication for the production end to resend. The production end uses the id in the command to query the service message in the database and sends the message again, that is, the production end switches to Step1.
This scheme reduces the storage of database and ensures the performance.
Idempotency guarantee at the consumption end
Idempotence
In layman’s terms, the result of N operations is the same. Learn from the optimistic locking mechanism of database. Execute a SQL statement to update the database: UPDATE T_REPS SET COUNT = COUNT – 1, version = version + 1 WHERE version = 1
The consumer side guarantees idempotency
Avoid repeated consumption of messages: the consumer is idempotent and receives multiple identical messages, but does not re-consume, that is, receives multiple identical messages.
Solution:
-
SELECT COUNT(1) FROM T_ORDER WHERE ID = 1 and ID = 1; SELECT COUNT(1) FROM T_ORDER WHERE ID = 1 and ID = 1; If the message has been processed, failure is returned.
- Advantages: Simple implementation
- Disadvantages: Performance bottlenecks for database writes under high concurrency
- Solution: Divide the database into tables and algorithm routing according to ID
-
Questions to consider when using the atomicity of Redis: whether to drop database, such as drop database, database and cache to achieve data consistency? Data is stored in the cache. How can I set a periodic synchronization policy (reliability assurance)?
Confirm
A confirmation message
Confirm
The concept of a message acknowledgement mechanism
When a producer sends a message, the Broker responds to the producer if it receives the message. The producer receives a reply to verify that the message is being sent to the Broker properly. It is the core guarantee of message reliability delivery.
Flow chart of validation mechanism
Sending a message and listening for a reply are asynchronous operations.
Verify the implementation of the message
- in
channel
Enable confirm mode:channel.confirmSelect()
; - in
channel
Add listener:channel.addConfirmListener(ConfirmListener listener)
; Returns the results of listening success and failure, and processes the specific results accordingly (resending, logging and waiting for subsequent processing, etc.)
Specific code:
Producer
public class ConfirmProducer { private static final String EXCHANGE_NAME = "confirm_exchange"; private static final String ROUTING_KEY = "confirm.key"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); 192.168.58.129 connectionFactory. SetHost (" "); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/test"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); Channel.confirmselect (); String MSG = "Send message of confirm demo"; channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, msg.getBytes()); / / add confirmation to monitor channel. AddConfirmListener (new ConfirmListener () {/ / success @ Override public void handleAck (long deliveryTag, boolean multiple) throws IOException { System.out.println("========= Ack ========"); } @override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("========= Nack ========"); }}); }}Copy the code
Consumer
public class ConfirmConsumer { private static final String EXCHANGE_NAME = "confirm_exchange"; private static final String ROUTING_KEY = "confirm.#"; private static final String QUEUE_NAME = "confirm_queue"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); 192.168.58.129 connectionFactory. SetHost (" "); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/test"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); / / bind switches and queue, specify the routing key channel. ExchangeDeclare (EXCHANGE_NAME, BuiltinExchangeType. TOPIC, true); channel.queueDeclare(QUEUE_NAME, true, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf-8"); System.out.println("Received message : " + msg); }}; channel.basicConsume(QUEUE_NAME, true, defaultConsumer); }}Copy the code
Console output:
Return
Message mechanism
Used to process messages that are not routable.
Based on the API
A key configuration item, Mandatory: true, indicates that the listener receives a message indicating that a route is unreachable and processes it. Mandatory: false, the Broker automatically deletes the message. The default is false.
The flow chart
Message producer through formulationExchange
andRoutingKey
The message is delivered to a queue, and the consumer listens to the queue and consumes it. But in some cases, when you send a message,Exchange
There is no orRoutingKey
Route not available,Return Listener
It listens for such unreachable messages and processes them.
Return Listener
code
Consumer
public class ReturnConsumer { private static final String EXCHANGE_NAME = "return_exchange"; private static final String ROUTING_KEY = "return.#"; private static final String QUEUE_NAME = "return_queue"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); 192.168.58.129 connectionFactory. SetHost (" "); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/test"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); / / bind switches and queue, specify the routing key channel. ExchangeDeclare (EXCHANGE_NAME, BuiltinExchangeType. TOPIC, true); channel.queueDeclare(QUEUE_NAME, true, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, Byte [] body) throws IOException {system.out.println ("Receive Message -- "+ new String(body)); byte[] body) throws IOException {system.out.println ("Receive Message --" + new String(body)); }}; channel.basicConsume(QUEUE_NAME, true, defaultConsumer); }}Copy the code
Producer
public class ReturnProducer { private static final String EXCHANGE_NAME = "return_exchange"; private static final String ROUTING_KEY = "return.key"; private static final String ROUTING_KEY_ERROR = "wrong.key"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); 192.168.58.129 connectionFactory. SetHost (" "); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/test"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String MSG = "Send message of return demo"; To add and set / / Return listener channel. AddReturnListener (new ReturnListener () {@ Override public void handleReturn (int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println("============ handleReturn ============"); System.err. Println ("replyCode -- "+ replyCode); System.err. Println ("replyText -- "+ replyText); System.err. Println ("exchange -- "+ exchange); System.err. Println ("routingKey -- "+ routingKey); System.err. Println ("properties -- "+ properties); System.err. Println ("body -- "+ new String(body)); }}); // Set Mandatory to true, and messages are not deleted. // channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, true,null, msg.getBytes()); BasicPublish (EXCHANGE_NAME, ROUTING_KEY_ERROR, true, null, msg.getBytes()); }}Copy the code
handleReturn
Parameter output:Specific processing can be written under this method.
Traffic limiting on the consumption end
The concept of flow limiting at the consumer end
When a large number of messages are pushed all at once, a single client cannot process these data at the same time, and the server is prone to failure. Therefore, traffic limiting should be carried out at the consumption end. RabbitMQ provides a quality of service (Qos) function that does not consume new messages until a certain number of messages have been confirmed (through consume or channel) without automatic acknowledgment.
/**
* Request specific "quality of service" settings.
*
* These settings impose limits on the amount of data the server
* will deliver to consumers before requiring acknowledgements.
* Thus they provide a means of consumer-initiated flow control.
* @see com.rabbitmq.client.AMQP.Basic.Qos
* @param prefetchSize maximum amount of content (measured in
* octets) that the server will deliver, 0 if unlimited
* @param prefetchCount maximum number of messages that the server
* will deliver, 0 if unlimited
* @param global true if the settings should be applied to the
* entire channel rather than each consumer
* @throws java.io.IOException if an error is encountered
*/
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
Copy the code
prefetchSize
: Indicates the size of the message limit. The value is usually 0.prefetchCount
: Indicates the number of messages processed at a time. The value is usually set to 1global
: The value is generally false. True, inchannel
The level is limited; False, inconsumer
Level restriction (manual ACK)
Code demo
Consumer
public class QosConsumer { private static final String EXCHANGE_NAME = "qos_exchange"; private static final String ROUTING_KEY = "qos.#"; private static final String QUEUE_NAME = "qos_queue"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); 192.168.58.129 connectionFactory. SetHost (" "); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/test"); connectionFactory.setUsername("orcas"); connectionFactory.setPassword("1224"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); / / bind switches and queue, specify the routing key channel. ExchangeDeclare (EXCHANGE_NAME, BuiltinExchangeType. TOPIC, true); channel.queueDeclare(QUEUE_NAME, true, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, Byte [] body) throws IOException {system.out.println ("Receive Message -- "+ new String(body)); byte[] body) throws IOException {system.out.println ("Receive Message --" + new String(body)); // Manually ack channel.basicack (envelope. GetDeliveryTag (), false); }}; /** * prefetchSize: 0 unrestricted message size * prefetchCount: the number of messages processed at one time, after the ack continue to push * global: False applies at the consumer level */ channel.basicQos(0, 1, false); BasicConsume (QUEUE_NAME, false, defaultConsumer); // Set autoAck to false and close channel.basicConsume(QUEUE_NAME, false, defaultConsumer); }}Copy the code
Producer
public class QosProducer { private static final String EXCHANGE_NAME = "qos_exchange"; private static final String ROUTING_KEY = "qos.key"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); 192.168.58.129 connectionFactory. SetHost (" "); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/test"); connectionFactory.setUsername("orcas"); connectionFactory.setPassword("1224"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String msg = "Send message of QOS demo"; for (int i = 0; i < 5; i ++) { channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, true, null, msg.getBytes()); }}}Copy the code
Channel. BasicQos (0, 1, false); Disable autoAck and need to sign in manually. In the rewritten handleDelivery method, if there is no manual signin for channel.basicack (), then the consumer will receive only one message when it receives the message because prefetchCount is set to 1. The rest of the message will not be pushed until it is manually ack.
The queue
The consumer endACK
And requeue mechanism
- Handmade at the consumer end
ACK
andNACK
:
When the consumer is consuming, it may call NACK to reject confirmation because of service abnormalities. After a certain number of times, it directly ACK the abnormal messages to log, and then compensate. Due to serious problems such as server breakdown, the consumer fails to consume the message. After the message is resend, you need to manually ACK the message to ensure that the consumer consumes the message successfully.
- Consumers are back in the queue:
Returns unsuccessful messages to the Broker. In practice, requeueing is usually turned off.
TTL
The queue
TTL: Time To Live. You can specify an expiration time for a message. You can specify the expiration time of the queue, which is calculated from the time the message is queued. If the timeout time of the queue is exceeded, the message will be cleared automatically.
Console demo:
Declare queues and setTTL
Length:
Declare a switch:
Add a binding:
Send a message:
Ten seconds later, the message disappeared because the TTL expired.
Of the messageTTL
:
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.expiration("10000")
.build();
Copy the code
Dead-letter queue
DLX: Dead - Letter - Exchange
When a message becomes dead in a queue, it can be republished to another Exchange, the DLX.
A dead letter queue occurs:
- Message rejected (
basic.reject/ basic.nack
) andrequeue=false
(No reentry) - The message
TTL
overdue - The queue length reaches the maximum. Procedure
Dead letter queue setup:
- To properly declare a switch, queue and bind, you need to set a parameter on the queue:
arguments.put("x-dead-letter-exchange", "dlx.exchange");
- Declare a dead letter queue
Exchange
andQueue
And then bind:Exchange: dlx.exchange
Queue: dlx.queue
RoutingKey: #
- After the message expires,
requeue
When the queue reaches its maximum length, the message will be sent to the specifieddlx.exchange
On the switch, consumers listen to the dead-letter queue that the switch is bound to.
Code demo:
public class DlxConsumer { private static final String EXCHANGE_NAME = "dlx_exchange"; private static final String ROUTING_KEY = "dlx.#"; private static final String QUEUE_NAME = "dlx_queue"; // DLX private static final String DLX_EXCHANGE = "dlx.exchange"; private static final String DLX_QUEUE = "dlx.queue"; private static final String DLX_ROUTING_KEY = "#"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); 192.168.58.129 connectionFactory. SetHost (" "); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/test"); connectionFactory.setUsername("orcas"); connectionFactory.setPassword("1224"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true); Map<String, Object> arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange", DLX_EXCHANGE); channel.queueDeclare(QUEUE_NAME, true, false, false, arguments); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); / / 2. The statement dead-letter queue channel. ExchangeDeclare (DLX_EXCHANGE, BuiltinExchangeType TOPIC, true, false, null); channel.queueDeclare(DLX_QUEUE, true, false, false, null); channel.queueBind(DLX_QUEUE, DLX_EXCHANGE, DLX_ROUTING_KEY); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, Byte [] body) throws IOException {system.out.println ("Receive Message -- "+ new String(body)); byte[] body) throws IOException {system.out.println ("Receive Message --" + new String(body)); // Manually ack channel.basicack (envelope. GetDeliveryTag (), false); // false do not batch sign}}; channel.basicConsume(QUEUE_NAME, false, defaultConsumer); }}Copy the code