This is the 11th day of my participation in the August More text Challenge. For details, see: August More Text Challenge


Related articles

RabbitMQ series: RabbitMQ series


preface

  • The producer sets the channel to confirm mode. Once the channel enters confirm mode, all messages published on that channel will be assigned a unique ID(starting with 1) once the message is posted to all matching queues.

  • The broker sends an acknowledgement (containing the message’s unique ID) to the producer, which lets the producer know that the message has arrived on the correct destination queue. If the message and queue are persistent, the acknowledgement is sent after the message has been written to disk.

  • The delivery-tag field of the acknowledgment message sent by the broker to the producer contains the sequence number of the acknowledgment message. The broker can also set the Multiple field of basic.ack to indicate that all the messages have been processed up to this sequence number.

  • The great benefit of the Confirm pattern is that it is asynchronous. Once a message is published, the producer application can wait for the channel to return an acknowledgement while continuing to send the next message, when the message is finally acknowledged.

  • The producer application can process the acknowledgement in a callback method. If RabbitMQ loses the message due to an internal error, it will send a nack message. The producer application can also process the nack message in the callback method.

  • Example Enable the confirmation mode for publishing

    • Publishing validation is not enabled by default. To enable this function you need to call the confirmSelect method, which you need to call on a channel whenever you want to use publishing validation

    • // Enable publish confirmation
      channel.confirmSelect();
      Copy the code

A single confirmation release

  • WaitForConfirmsOrDie (Long) is a method that returns only when a message is confirmed. This method returns only when the message is confirmed. It throws an exception if the message is not acknowledged within the specified time range.

  • producers

    • /** * This is a test producer *@author DingYongJun
       *@date2021/8/1 * /
      public class DyProducerTest_dingyuefabu {
      
          // Set the execution times
          public static final int MESSAGE_COUNT = 888;
          /** * For convenience, we use the main function to test *@param args
           */
          public static void main(String[] args) throws Exception {
              // A single release confirmation is performed
              publishMessageIndividually();
          }
      
          /** * Single release confirmation */
          public static void publishMessageIndividually(a) throws Exception {
              Channel channel = RabbitMqUtils.getChannel();
              String queueName = UUID.randomUUID().toString();
              channel.queueDeclare(queueName, false.false.false.null);
              // Enable publish confirmation
              channel.confirmSelect();
              long begin = System.currentTimeMillis();
              for (int i = 0; i < MESSAGE_COUNT; i++) {
                  String message = i + "";
                  channel.basicPublish("", queueName, null, message.getBytes());
                  // If the server returns false or does not return the message within the timeout period, the producer can resend the message
                  boolean flag = channel.waitForConfirms();
                  if(flag){
                      System.out.println("Message sent successfully"); }}long end = System.currentTimeMillis();
              System.out.println("Release" + MESSAGE_COUNT + "Individual confirmation messages, time consuming." + (end - begin) +
                      "ms"); }}Copy the code
  • The execution result

  • One of the biggest disadvantages of this type of acknowledgment is that the publishing speed is extremely slow, because if a published message is not acknowledged, all subsequent messages are blocked, providing throughput of no more than a few hundred published messages per second. Of course this may be enough for some applications.

  • Of course, now with you to say slowly, you don’t have to feel, the following several combined comparison you will find his efficiency is low!

Ii. Batch confirmation and release

  • Publishing a batch of messages and then confirming them all together can greatly improve throughput compared to a single waiting acknowledgement message.

  • producers

    • /** * Batch release confirmation */
      public static void publishMessageBatch(a) throws Exception {
              Channel channel = RabbitMqUtils.getChannel();
              // The queue name uses the UUID to get a unique value, so you don't need to name it yourself.
              String queueName = UUID.randomUUID().toString();
              channel.queueDeclare(queueName, false.false.false.null);
              // Enable publish confirmation
              channel.confirmSelect();
              // Batch confirm message size
              int batchSize = 88;
              // The number of unconfirmed messages
              int outstandingMessageCount = 0;
              long begin = System.currentTimeMillis();
              for (int i = 0; i < MESSAGE_COUNT; i++) {
                  String message = i + "";
                  channel.basicPublish("", queueName, null, message.getBytes());
                  outstandingMessageCount++;
                  if (outstandingMessageCount == batchSize) {
                      channel.waitForConfirms();// Verify the code
                      outstandingMessageCount = 0; }}// To make sure that there are still unconfirmed messages
              if (outstandingMessageCount > 0) {
                  channel.waitForConfirms();
              }
              long end = System.currentTimeMillis();
              System.out.println("Release" + MESSAGE_COUNT + "Batch confirmation messages, time consuming" + (end - begin) +
                      "ms");
          }
      Copy the code
  • The execution result

  • Cons: When a failure causes a release problem, we don’t know which message is at fault, and we have to keep the entire batch in memory to record important information and then republish the message.

  • Of course this scheme is still synchronous and also blocks the publication of messages.

Asynchronous confirmation publishing

  • Asynchronous confirm although two complex programming logic than the last but the most valuable, both reliability and efficiency doesn’t have to say, he is using the callback function to achieve message transmission reliability, the middleware is also through the callback functions is delivered successfully, let us in detail how asynchronous confirmation is done.

  • producers

    •    /** * Asynchronous release confirmation */
          public static void publishMessageAsync(a) throws Exception {
              try (Channel channel = RabbitMqUtils.getChannel()) {
                  String queueName = UUID.randomUUID().toString();
                  channel.queueDeclare(queueName, false.false.false.null);
                  // Enable publish confirmation
                  channel.confirmSelect();
                  /** * a hash table with thread safe order, suitable for high concurrency * 1. Easily associate sequence numbers with messages * 2. Easily batch delete items as long as the sequence number is given * 3. Supports concurrent access to */
                  ConcurrentSkipListMap<Long, String> outstandingConfirms = new
                          ConcurrentSkipListMap<>();
                  /** * A callback to acknowledge receipt of a message * 1. Message sequence number * 2. True to acknowledge messages less than or equal to the current sequence number * false To acknowledge the current sequence number message */
                  ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
                      if (multiple) {
                          // Return an unacknowledged message less than or equal to the current sequence number as a map
                          ConcurrentNavigableMap<Long, String> confirmed =
                                  outstandingConfirms.headMap(sequenceNumber, true);
                          // Clear the part of unacknowledged messages
                          confirmed.clear();
                      }else{
                          // Clear only messages with the current sequence numberoutstandingConfirms.remove(sequenceNumber); }}; ConfirmCallback nackCallback = (sequenceNumber, multiple) -> { String message = outstandingConfirms.get(sequenceNumber); System.out.println("The release of the message"+message+"Not identified. Serial number."+sequenceNumber);
                  };
                  /** * add a listener for asynchronous acknowledgement * 1. A callback to acknowledge receipt * 2. Callback */ for messages not received
                  channel.addConfirmListener(ackCallback, null);
                  long begin = System.currentTimeMillis();
                  for (int i = 0; i < MESSAGE_COUNT; i++) {
                      String message = "News" + i;
                      / * * * channel. GetNextPublishSeqNo () to obtain the serial number of the next message * through serial number associated with the message body for a * all * / unconfirmed message body
                      outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
                      channel.basicPublish("", queueName, null, message.getBytes());
                  }
                  long end = System.currentTimeMillis();
                  System.out.println("Release" + MESSAGE_COUNT + "One asynchronous acknowledgement message, time consuming." + (end - begin) +
                          "ms"); }}Copy the code
  • The execution result

  • It is easy to see that this way of flying fast ah!

  • How do I handle unacknowledged messages?

    • The best solution is to place the unacknowledged message in a memory-based queue that can be accessed by the publishing thread, such as using the ConcurrentLinkedQueue to pass the message between the confirm Callbacks and the publishing thread.

Four,

  • Separate messages

    • Time: 21210 ms
    • Synchronous waiting for confirmation is simple, but throughput is very limited.
  • Batch release

    • Time: 525 ms
    • Batch synchronization waiting for confirmation, simple, reasonable throughput, once there is a problem but it is difficult to infer that that message is the problem.
  • Asynchronous processing

    • Time: 45 ms
    • Optimal performance and resource usage are well controlled in the event of an error, but are slightly more difficult to implement

I see no ending, but I will search high and low

If you think I blogger writes good! Writing is not easy, please like, follow, comment to encourage the blogger ~hahah