The RabbitMQ idempotence

concept

The results of one or multiple requests for the same operation are the same, and there are no side effects caused by multiple clicks. For an example of the simplest, and that is to pay, after users buy goods payment, your payment is successful, but to return the result of network anomaly, right now the money has been buckled, users click on the button again, this time will be the second deductions, success, balance of user queries found more buckles money, running water record into two. In the previous single application system, we only need to put the data operation into the transaction, error immediately rolled back, but then respond to the client may also appear network interruption or exceptions and so on; In the case of message queues, repeated consumption of messages may occur

Repeated consumption of messages

Consumers in the consumption of MQ message, MQ already send message to consumers, consumers returned to ack to MQ network interruption, so the MQ are not yet received confirmation, this news will resend to other consumers, or in the network again after reconnection is sent to the customer, but in fact the consumer consumption successfully this news, Causing consumers to consume duplicate messages.

solution

Idempotent solutions for MQ consumers:

  • Use a global ID or write a unique identifier like a timestamp
  • The UUID or message in the order consumer consuming MQ can also be determined by this ID of MQ;
  • Or you can generate a globally unique ID according to your own rules;

Each time a message is consumed, the id is used to determine whether the message has been consumed.

Idempotent judgment on the consumption side

During peak business times, when a large number of orders are generated, the production side is likely to have repeated messages, and the consumer side is idempotent, meaning that our message will never be consumed more than once, even if we receive the same message. There are two mainstream idempotent operations in the industry: a. Unique ID+ fingerprint code mechanism, which uses database primary key to de-weight; B. Using the atomicity of Redis to achieve;

Unique ID+ fingerprint mechanism

Fingerprint code: some of our rules or timestamp and other services to the only information code, it is not necessarily generated by our system, basic it is by our business rules joining together, but must guarantee uniqueness, then use to judge whether the id exists in the database query, advantage is simple a splicing, Then query to determine whether it is repeated; The disadvantage is that in high concurrency, if it is a single database, there will be a write performance bottleneck. Of course, we can also use separate database and separate table to improve performance, but it is not the most recommended method.

Redis atomic

Redis is used to execute setnx commands, naturally idempotent. So as not to repeat consumption;

Priority queue

Usage scenarios

Have an order in our system of payment, our customers orders under Tmall, taobao will push your order to us in time, if the user set period of time without payment will be pushed to users a SMS alerts, a very simple function right, but the tmall merchants for us, is certainly will divide the big customer and small customer right, For example, large merchants like Apple and Xiaomi can at least create a large profit for us in a year, so it is reasonable that, of course, their orders must be prioritized. However, our back end system used to store periodic polling using Redis. As we all know, Redis can only make a simple message queue using List. It is not possible to implement a single priority scenario, so use RabbitMQ to modify and optimize large orders. If large orders are found to have a higher priority, otherwise the default priority is given.

How to add

  • Console page added:

  • Add priority to queue code:
Map<String, Object> params = new HashMap();
params.put("x-max-priority".10);
channel.queueDeclare("hello".true.false.false, params);
Copy the code
  • Add priority to code in message:
AMQP.BasicProperties  properties  =  new AMQP.BasicProperties().builder().priority(5).build();
Copy the code
  • Matters needing attention:

The things you need to do to make a queue priority are as follows:

  • Queues need to be set to priority queues,
  • Message The message priority needs to be set
  • The consumer has to wait for the message to be sent to the queue before consuming it because then it has a chance to sort the message;

In actual combat

  • Message producer:
package com.vleus.rabbitmq.one;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

/ * * *@author vleus
 * @dateJuly 19, 2021 22:00 */
public class Producer {

    // Set the queue name
    public static final String QUEUE_NAME = "hello";

    / / message
    public static void main(String[] args) throws IOException, TimeoutException {

        // Create a connection factory
        ConnectionFactory connectionFactory = new ConnectionFactory();

        // Set the factory IP to connect the rabbitMQ queue
        connectionFactory.setHost("192.168.37.139");
        // Set the user name and password
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("123456");

        // Create a connection
        Connection connection = connectionFactory.newConnection();
        // Get the channel
        Channel channel = connection.createChannel();

        /** * create a queue: * 1; * 2. Whether the messages in the queue are persistent (stored on disk). By default, the messages are stored in memory. * 3, whether the queue is only for one consumer to consume, whether to share the message, true can multiple consumers to consume,false can only one consumer to consume * 4, whether to automatically delete, the last consumer to open the connection, whether the queue automatically delete, true automatically delete,false not automatically delete; * 5, other parameters */
        Map<String, Object> argumentsMap = new HashMap<>();
        argumentsMap.put("x-max-priority".10); // The official value is 0-255. Do not set the value too large to waste CPU
        channel.queueDeclare(QUEUE_NAME, true.false.false, argumentsMap);

        / / message
        String message = "Hello,World";

        /** * Publish messages * 1. * 2. The key value of the route is the queue name. * 3. * 4. The body of the sent message */
        for (int i = 0; i < 10; i++) {
            String msg = "info" + i;
            if (i == 5) {
                AMQP.BasicProperties properties = new AMQP.BasicProperties()
                        .builder().priority(5).build();
                channel.basicPublish("",QUEUE_NAME,properties,msg.getBytes());
            }else{
                channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
            }
        }

        System.out.println("Message sent..."); }}Copy the code
  • Message consumer
package com.vleus.rabbitmq.one;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/ * * *@author vleus
 * @dateConsumers, receiving messages */
public class Consumer {

    // Queue name
    public static final String QUEUE_NAME = "hello";

    // Receive the message
    public static void main(String[] args) throws IOException, TimeoutException {

        // Create a connection factory
        ConnectionFactory connectionFactory = new ConnectionFactory();

        // Set the factory IP to connect the rabbitMQ queue
        connectionFactory.setHost("192.168.37.139");
        // Set the user name and password
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("123456");

        // Create new connections
        Connection connection = connectionFactory.newConnection();

        // Create channel
        Channel channel = connection.createChannel();

        // Declare a callback for the received message
        DeliverCallback deliverCallback = (consumerTag,message) -> {
            System.out.println(new String(message.getBody()));
        };

        CancelCallback cancelCallback = consumerTag -> {
            System.out.println(Message consumption is interrupted);
        };

        /** * Consumer consumption message * 1, which queue consumption; * 2, after successful consumption whether to automatically answer, true automatic answer, false manual answer; * 3. Callback of unsuccessful consumption by consumers; * 4. Consumers cancel the pullback of consumption; * /
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); }}Copy the code

Results:

Inert queue

Lazy queues: Messages are stored on disk. Normally, messages are stored in memory so they can be consumed faster. For lazy queues, messages are stored on disk.

Usage scenarios

RabbitMQ introduced the concept of lazy queues beginning with version 3.6.0. Lazy queues store messages to disk as much as possible and are loaded into memory only when the consumer consumes the corresponding message. An important design goal of lazy queues is to support longer queues, that is, more message storage. Lazy queues are necessary when consumers are unable to consume messages for a long period of time due to various reasons, such as a consumer offline, outage, or maintenance shutdown. By default, when a producer sends a message to RabbitMQ, the queued message is stored in memory as much as possible so that the message can be sent to the consumer more quickly. Even persistent messages have a backup in memory as they are written to disk. When RabbitMQ needs to free memory, it pages messages from memory to disk, which takes a long time and blocks the queue from receiving new messages. RabbitMQ developers are constantly updating their algorithms, but the results are not always good, especially at times of high message volumes.

Two modes

Queues have two modes: default and lazy. The default mode is default. No change is required for versions earlier than 3.6.0. Lazy mode is the lazy queue mode. It can be set in the parameter when calling channel.queueDeclare or in the Policy mode. If a queue uses both methods, the Policy mode takes precedence. If you want to change the mode of an existing queue declaratively, you can only delete the queue and then declare a new one.

During queue declaration, you can set the queue mode by x-queue-mode. The value can be default or lazy. The following example illustrates the declaration details of an lazy queue:

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode"."lazy");
channel.queueDeclare("myqueue".false.false.false, args);
Copy the code

Memory overhead comparison

In the case of 1 million messages, each approximately 1KB, the normal queue takes up 1.2GB of memory, while the lazy queue takes up only 1.5MB