Confirmation of the release of the information

Principles of Release confirmation

The producer sets the channel to confirm mode. Once the channel enters confirm mode, == All messages published on the channel will be assigned a unique ID(starting with 1), == once the message is posted to all matching queues, The broker sends an acknowledgement (containing the message’s unique ID) to the producer. This lets the producer know that the message has arrived on the correct destination queue. If the message and queue are persistent, the acknowledgement is sent after the message has been written to disk. The delivery-tag field of the acknowledgement message sent back to the producer contains the sequence number of the acknowledgement message. The broker can also set the multiple field of basic.ack to indicate that all the messages have been processed up to this sequence number.

The main benefit of confirm mode is that it is asynchronous. Once a message is published, the producer application can continue to send the next message while waiting for the channel to return an acknowledgement. When the acknowledgement is finally received, the producer application can process the acknowledgement through the callback method. If RabbitMQ loses the message due to an internal error, it sends a nack message, which the producer application can also process in the callback method.

Publish the policy of confirmation

Enable the method for confirming a release

ConfirmSelect (); confirmSelect (); confirmSelect (); confirmSelect (); confirmSelect (); confirmSelect ();

Single confirmation release

WaitForConfirmsOrDie (long) is a method that returns only when a message is confirmed. This method returns only when a message is confirmed. It throws an exception if the message is not acknowledged within the specified time range. One of the biggest disadvantages of this type of acknowledgment is that it is extremely slow to publish ==, because if a published message is not acknowledged, all subsequent messages will be blocked, providing throughput of no more than a few hundred published messages per second. Of course this may be enough for some applications.

 // Single confirmation
    public static void publishMessageIndually(a) throws Exception {

        Channel channel = RabbitMqUtils.getChannel();
        // Enable the release confirmation mode
        channel.confirmSelect();

        // Declare the queue
        String queueName = UUID.randomUUID().toString();

        channel.queueDeclare(queueName, true.false.false.null);

        // Start time
        long beginTime = System.currentTimeMillis();

        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("",queueName,null,message.getBytes());
            // A single message is confirmed immediately
            boolean flag = channel.waitForConfirms();
            if (flag) {
                System.out.println("Message sent successfully"); }}// End time
        long endTime = System.currentTimeMillis();
        System.out.println("Release:" + MESSAGE_COUNT + "A separate confirmation message, time consuming:" + (endTime - beginTime) + "ms");
    }
Copy the code

Batch Release Confirmation

The above that way is very slow, compared with a single waiting for a confirmation message, released a batch of news first and then confirm together can greatly improve the throughput, of course the disadvantage of this approach is: when the problems on publishing the failure cause, don’t know is which message appear problem, we must to save the whole batch in memory, To record important information and then re-release the message. Of course this scheme is still synchronous and also blocks the publication of messages.

Asynchronous confirmation publishing

Asynchronous confirm although two complex programming logic than the last but the most valuable, both reliability and efficiency doesn’t have to say, he is using the callback function to achieve message transmission reliability, the middleware is also through the callback functions is delivered successfully, let us in detail how asynchronous confirmation is done.

// Publish confirmation asynchronously
public static void publishMessageAsync(a) throws Exception {

    Channel channel = RabbitMqUtils.getChannel();
    // Enable the release confirmation mode
    channel.confirmSelect();

    // Declare the queue
    String queueName = UUID.randomUUID().toString();

    channel.queueDeclare(queueName, true.false.false.null);

    // Start time
    long beginTime = System.currentTimeMillis();


    // If the message is successful, call back to the function
    ConfirmCallback ackCallback = (deliveryTag,multiple) -> {
        System.out.println("Confirmation:" + deliveryTag);
    };

    // Failed to confirm the message, callback function
    /** * deliveryTag; * Multiple: indicates whether batch confirmation */
    ConfirmCallback nackCallback = (deliveryTag,multiple) -> {
        System.out.println("Unconfirmed information:" + deliveryTag);
    };

    // Prepare a message listener: listen for messages which messages were sent successfully and which messages failed to be sent?
    /** * 1, listen for messages sent successfully * 2, listen for messages sent */
    channel.addConfirmListener(ackCallback, nackCallback); // Asynchronous notification

    // Send messages in batches
    for (int i = 0; i < MESSAGE_COUNT; i++) {
        String message = "News" + i;
        channel.basicPublish("",queueName,null,message.getBytes());

        // Confirm the release
    }

    // End time
    long endTime = System.currentTimeMillis();
    System.out.println("Release:" + MESSAGE_COUNT + "A confirmation message is posted asynchronously, time consuming:" + (endTime - beginTime) + "ms");
}

Copy the code

How do I handle asynchronous unacknowledged messages

The best solution is to place the unacknowledged message in a memory-based queue that can be accessed by the publishing thread, such as using the ConcurrentLinkedQueue to pass the message between the confirm Callbacks and the publishing thread. Example:

// Publish confirmation asynchronously
    public static void publishMessageAsync(a) throws Exception {

        Channel channel = RabbitMqUtils.getChannel();
        // Enable the release confirmation mode
        channel.confirmSelect();

        /** * Prepare a thread-safe, ordered hash Map, suitable for high concurrency; * 1. Easily associate sequence numbers with messages; * 2, easily batch delete message entries (as long as the number is given); * 3, support high concurrency (multi-threaded); * /
        ConcurrentSkipListMap<Long, String> outstandingConfirms =
                new ConcurrentSkipListMap<>();

        // Declare the queue
        String queueName = UUID.randomUUID().toString();

        channel.queueDeclare(queueName, true.false.false.null);

        // Start time
        long beginTime = System.currentTimeMillis();


        // If the message is successful, call back to the function
        ConfirmCallback ackCallback = (deliveryTag,multiple) -> {
            if (multiple) {
                //2. Delete the confirmed messages, and the unconfirmed messages are left
                ConcurrentNavigableMap<Long, String> confirmed
                        = outstandingConfirms.headMap(deliveryTag);
                confirmed.clear();
            }else{
                outstandingConfirms.remove(deliveryTag);
            }
            System.out.println("Confirmation:" + deliveryTag);
        };

        // Failed to confirm the message, callback function
        /** * deliveryTag; * Multiple: indicates whether batch confirmation */
        ConfirmCallback nackCallback = (deliveryTag,multiple) -> {

            String message = outstandingConfirms.get(deliveryTag);
            System.out.println("The unconfirmed information is:" + message +  ", the unconfirmed message id is:" + deliveryTag);
        };

        // Prepare a message listener: listen for messages which messages were sent successfully and which messages failed to be sent?
        /** * 1, listen for messages sent successfully * 2, listen for messages sent */
        channel.addConfirmListener(ackCallback, nackCallback); // Asynchronous notification

        // Send messages in batches
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = "News" + i;
            channel.basicPublish("",queueName,null,message.getBytes());

            //1. Record all messages to be sent. Key is the serial number of the message to be sent
            outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
        }

        // End time
        long endTime = System.currentTimeMillis();
        System.out.println("Release:" + MESSAGE_COUNT + "A confirmation message is posted asynchronously, time consuming:" + (endTime - beginTime) + "ms");
    }
Copy the code

The above three release confirmation speed comparison

  • Publish messages separately: wait for confirmation synchronously, simple, but with very limited throughput;
  • Batch release message: batch synchronization waiting for confirmation, simple, reasonable throughput, once there is a problem but it is difficult to infer that the message is the problem;
  • Asynchronous processing: best performance and resource usage, well controlled in the event of an error, but slightly harder to implement;