Reliable message delivery scheme at production end

What is production-end reliability delivery

  • Ensure the successful delivery of messages
  • securityMQThe node receives data successfully.
  • Sender receivesMQNode (Broker) Acknowledgement (received)
  • Improve the message compensation mechanism

Reliability delivery scheme (without transactions)

1. The message is stored in the database (persisted to the database), and the message status is marked. If the message does not respond, the polling operation is performed.

  • Step1: store the business message and regenerate a message to the message DB for recording (for example, the message was created and is being sent status: 0). (Disadvantage: Persist database twice)

  • Step2: the production end sends messages.

  • Step3: after receiving the message, the Broker sends a reply to the production end. Confirm Listener Listens to the Broker asynchronously.

  • Step4: After the reply indicates that the message is delivered successfully, capture the specified message record in the message DB and update the status, such as status: 1

  • Step5: for example, if the network is unstable in Step3, the Listener does not receive a reply for successfully confirming the message. The status in the message database is still 0, and the Broker may be the state that received the message. So set a rule (timed task), such as extracting the record if it is still in the zero state after 5 minutes (timeout).

  • Step6: Redeliver

  • Step7: Limit the number of retries, such as 3. If more than 3 times, the delivery fails. Update the status value. (Use compensation mechanism to query the cause of message failure, manual)

2. Delayed delivery of messages, secondary confirmation, callback check. (High-concurrency scenario)

  • Upstream service: Production

  • Downstream Service: indicates the consumer end

  • Step1: after the business message is dropped, send the message to the Broker.

  • Step2: then send the second delay (set delay time) check message.

  • Step3: the consumer listens to the specified queue to receive the message for processing

  • Step4: after processing, generate a response message and send it to the Broker.

  • Step5: The Callback service listens to the response message, and persists the response message to the message DB (record the success status).

  • Step6: when the delay time arrives, the delayed message is also monitored by the listener of Callback service, and the message DB is checked. If no successful status is found, the Callback service compensates and initiates an RPC communication for the production end to resend. The production end uses the id in the command to query the service message in the database and sends the message again, that is, the production end switches to Step1.

This scheme reduces the storage of database and ensures the performance.

Idempotency guarantee at the consumption end

Idempotence

In layman’s terms, the result of N operations is the same. Learn from the optimistic locking mechanism of database. Execute a SQL statement to update the database: UPDATE T_REPS SET COUNT = COUNT – 1, version = version + 1 WHERE version = 1

The consumer side guarantees idempotency

Avoid repeated consumption of messages: the consumer is idempotent and receives multiple identical messages, but does not re-consume, that is, receives multiple identical messages.

Solution:

  • SELECT COUNT(1) FROM T_ORDER WHERE ID = 1 and ID = 1; SELECT COUNT(1) FROM T_ORDER WHERE ID = 1 and ID = 1; If the message has been processed, failure is returned.

    • Advantages: Simple implementation
    • Disadvantages: Performance bottlenecks for database writes under high concurrency
    • Solution: Divide the database into tables and algorithm routing according to ID
  • Questions to consider when using the atomicity of Redis: whether to drop database, such as drop database, database and cache to achieve data consistency? Data is stored in the cache. How can I set a periodic synchronization policy (reliability assurance)?

ConfirmA confirmation message

ConfirmThe concept of a message acknowledgement mechanism

When a producer sends a message, the Broker responds to the producer if it receives the message. The producer receives a reply to verify that the message is being sent to the Broker properly. It is the core guarantee of message reliability delivery.

Flow chart of validation mechanism

Sending a message and listening for a reply are asynchronous operations.

Verify the implementation of the message

  1. inchannelEnable confirm mode:channel.confirmSelect();
  2. inchannelAdd listener:channel.addConfirmListener(ConfirmListener listener); Returns the results of listening success and failure, and processes the specific results accordingly (resending, logging and waiting for subsequent processing, etc.)

Specific code:

Producer

public class ConfirmProducer { private static final String EXCHANGE_NAME = "confirm_exchange"; private static final String ROUTING_KEY = "confirm.key"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); 192.168.58.129 connectionFactory. SetHost (" "); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/test"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); Channel.confirmselect (); String MSG = "Send message of confirm demo"; channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, msg.getBytes()); / / add confirmation to monitor channel. AddConfirmListener (new ConfirmListener () {/ / success @ Override public void handleAck (long deliveryTag, boolean multiple) throws IOException { System.out.println("========= Ack ========"); } @override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("========= Nack ========"); }}); }}Copy the code

Consumer

public class ConfirmConsumer { private static final String EXCHANGE_NAME = "confirm_exchange"; private static final String ROUTING_KEY = "confirm.#"; private static final String QUEUE_NAME = "confirm_queue"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); 192.168.58.129 connectionFactory. SetHost (" "); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/test"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); / / bind switches and queue, specify the routing key channel. ExchangeDeclare (EXCHANGE_NAME, BuiltinExchangeType. TOPIC, true); channel.queueDeclare(QUEUE_NAME, true, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf-8"); System.out.println("Received message : " + msg); }}; channel.basicConsume(QUEUE_NAME, true, defaultConsumer); }}Copy the code

Console output:

ReturnMessage mechanism

Used to process messages that are not routable.

Based on the API

A key configuration item, Mandatory: true, indicates that the listener receives a message indicating that a route is unreachable and processes it. Mandatory: false, the Broker automatically deletes the message. The default is false.

The flow chart

Message producer through formulationExchangeandRoutingKeyThe message is delivered to a queue, and the consumer listens to the queue and consumes it. But in some cases, when you send a message,ExchangeThere is no orRoutingKeyRoute not available,Return ListenerIt listens for such unreachable messages and processes them.

Return Listenercode

Consumer

public class ReturnConsumer { private static final String EXCHANGE_NAME = "return_exchange"; private static final String ROUTING_KEY = "return.#"; private static final String QUEUE_NAME = "return_queue"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); 192.168.58.129 connectionFactory. SetHost (" "); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/test"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); / / bind switches and queue, specify the routing key channel. ExchangeDeclare (EXCHANGE_NAME, BuiltinExchangeType. TOPIC, true); channel.queueDeclare(QUEUE_NAME, true, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, Byte [] body) throws IOException {system.out.println ("Receive Message -- "+ new String(body)); byte[] body) throws IOException {system.out.println ("Receive Message --" + new String(body)); }}; channel.basicConsume(QUEUE_NAME, true, defaultConsumer); }}Copy the code

Producer

public class ReturnProducer { private static final String EXCHANGE_NAME = "return_exchange"; private static final String ROUTING_KEY = "return.key"; private static final String ROUTING_KEY_ERROR = "wrong.key"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); 192.168.58.129 connectionFactory. SetHost (" "); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/test"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String MSG = "Send message of return demo"; To add and set / / Return listener channel. AddReturnListener (new ReturnListener () {@ Override public void handleReturn (int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println("============ handleReturn ============"); System.err. Println ("replyCode -- "+ replyCode); System.err. Println ("replyText -- "+ replyText); System.err. Println ("exchange -- "+ exchange); System.err. Println ("routingKey -- "+ routingKey); System.err. Println ("properties -- "+ properties); System.err. Println ("body -- "+ new String(body)); }}); // Set Mandatory to true, and messages are not deleted. // channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, true,null, msg.getBytes()); BasicPublish (EXCHANGE_NAME, ROUTING_KEY_ERROR, true, null, msg.getBytes()); }}Copy the code

handleReturnParameter output:Specific processing can be written under this method.

Traffic limiting on the consumption end

The concept of flow limiting at the consumer end

When a large number of messages are pushed all at once, a single client cannot process these data at the same time, and the server is prone to failure. Therefore, traffic limiting should be carried out at the consumption end. RabbitMQ provides a quality of service (Qos) function that does not consume new messages until a certain number of messages have been confirmed (through consume or channel) without automatic acknowledgment.

/**
 * Request specific "quality of service" settings.
 *
 * These settings impose limits on the amount of data the server
 * will deliver to consumers before requiring acknowledgements.
 * Thus they provide a means of consumer-initiated flow control.
 * @see com.rabbitmq.client.AMQP.Basic.Qos
 * @param prefetchSize maximum amount of content (measured in
 * octets) that the server will deliver, 0 if unlimited
 * @param prefetchCount maximum number of messages that the server
 * will deliver, 0 if unlimited
 * @param global true if the settings should be applied to the
 * entire channel rather than each consumer
 * @throws java.io.IOException if an error is encountered
 */
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
Copy the code
  • prefetchSize: Indicates the size of the message limit. The value is usually 0.
  • prefetchCount: Indicates the number of messages processed at a time. The value is usually set to 1
  • global: The value is generally false. True, inchannelThe level is limited; False, inconsumerLevel restriction (manual ACK)

Code demo

Consumer

public class QosConsumer { private static final String EXCHANGE_NAME = "qos_exchange"; private static final String ROUTING_KEY = "qos.#"; private static final String QUEUE_NAME = "qos_queue"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); 192.168.58.129 connectionFactory. SetHost (" "); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/test"); connectionFactory.setUsername("orcas"); connectionFactory.setPassword("1224"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); / / bind switches and queue, specify the routing key channel. ExchangeDeclare (EXCHANGE_NAME, BuiltinExchangeType. TOPIC, true); channel.queueDeclare(QUEUE_NAME, true, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, Byte [] body) throws IOException {system.out.println ("Receive Message -- "+ new String(body)); byte[] body) throws IOException {system.out.println ("Receive Message --" + new String(body)); // Manually ack channel.basicack (envelope. GetDeliveryTag (), false); }}; /** * prefetchSize: 0 unrestricted message size * prefetchCount: the number of messages processed at one time, after the ack continue to push * global: False applies at the consumer level */ channel.basicQos(0, 1, false); BasicConsume (QUEUE_NAME, false, defaultConsumer); // Set autoAck to false and close channel.basicConsume(QUEUE_NAME, false, defaultConsumer); }}Copy the code

Producer

public class QosProducer { private static final String EXCHANGE_NAME = "qos_exchange"; private static final String ROUTING_KEY = "qos.key"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); 192.168.58.129 connectionFactory. SetHost (" "); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/test"); connectionFactory.setUsername("orcas"); connectionFactory.setPassword("1224"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String msg = "Send message of QOS demo"; for (int i = 0; i < 5; i ++) { channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, true, null, msg.getBytes()); }}}Copy the code

Channel. BasicQos (0, 1, false); Disable autoAck and need to sign in manually. In the rewritten handleDelivery method, if there is no manual signin for channel.basicack (), then the consumer will receive only one message when it receives the message because prefetchCount is set to 1. The rest of the message will not be pushed until it is manually ack.

The queue

The consumer endACKAnd requeue mechanism

  • Handmade at the consumer endACKandNACK:

When the consumer is consuming, it may call NACK to reject confirmation because of service abnormalities. After a certain number of times, it directly ACK the abnormal messages to log, and then compensate. Due to serious problems such as server breakdown, the consumer fails to consume the message. After the message is resend, you need to manually ACK the message to ensure that the consumer consumes the message successfully.

  • Consumers are back in the queue:

Returns unsuccessful messages to the Broker. In practice, requeueing is usually turned off.

TTLThe queue

TTL: Time To Live. You can specify an expiration time for a message. You can specify the expiration time of the queue, which is calculated from the time the message is queued. If the timeout time of the queue is exceeded, the message will be cleared automatically.

Console demo:

Declare queues and setTTLLength:

Declare a switch:

Add a binding:

Send a message:

Ten seconds later, the message disappeared because the TTL expired.

Of the messageTTL:

AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .deliveryMode(2)
                    .expiration("10000")
                    .build();
Copy the code

Dead-letter queue

DLX: Dead - Letter - Exchange

When a message becomes dead in a queue, it can be republished to another Exchange, the DLX.

A dead letter queue occurs:

  • Message rejected (basic.reject/ basic.nack) andrequeue=false(No reentry)
  • The messageTTLoverdue
  • The queue length reaches the maximum. Procedure

Dead letter queue setup:

  • To properly declare a switch, queue and bind, you need to set a parameter on the queue:arguments.put("x-dead-letter-exchange", "dlx.exchange");
  • Declare a dead letter queueExchangeandQueueAnd then bind:
    • Exchange: dlx.exchange
    • Queue: dlx.queue
    • RoutingKey: #
  • After the message expires,requeueWhen the queue reaches its maximum length, the message will be sent to the specifieddlx.exchangeOn the switch, consumers listen to the dead-letter queue that the switch is bound to.

Code demo:

public class DlxConsumer { private static final String EXCHANGE_NAME = "dlx_exchange"; private static final String ROUTING_KEY = "dlx.#"; private static final String QUEUE_NAME = "dlx_queue"; // DLX private static final String DLX_EXCHANGE = "dlx.exchange"; private static final String DLX_QUEUE = "dlx.queue"; private static final String DLX_ROUTING_KEY = "#"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); 192.168.58.129 connectionFactory. SetHost (" "); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/test"); connectionFactory.setUsername("orcas"); connectionFactory.setPassword("1224"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true); Map<String, Object> arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange", DLX_EXCHANGE); channel.queueDeclare(QUEUE_NAME, true, false, false, arguments); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); / / 2. The statement dead-letter queue channel. ExchangeDeclare (DLX_EXCHANGE, BuiltinExchangeType TOPIC, true, false, null); channel.queueDeclare(DLX_QUEUE, true, false, false, null); channel.queueBind(DLX_QUEUE, DLX_EXCHANGE, DLX_ROUTING_KEY); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, Byte [] body) throws IOException {system.out.println ("Receive Message -- "+ new String(body)); byte[] body) throws IOException {system.out.println ("Receive Message --" + new String(body)); // Manually ack channel.basicack (envelope. GetDeliveryTag (), false); // false do not batch sign}}; channel.basicConsume(QUEUE_NAME, false, defaultConsumer); }}Copy the code