Website: www.rabbitmq.com/tutorials/t…

Enable publisher validation on Channel

The publisher confirms that they are RabbitMQ extensions of the AMQP 0.9.1 protocol, so they are not enabled by default. Enable publisher confirmation at the Channel level using the confirmSelect method:

Channel channel = connection.createChannel();
channel.confirmSelect();
Copy the code

This method must be called on every Channel that you want to use publisher validation. Validation should only be enabled once, not for every published message.

Strategy #1: Publish the message separately

Let’s start with the simplest way to use confirmation publishing, which is to publish a message and wait for its confirmation synchronously:

while (thereAreMessagesToPublish()) {
    byte[] body = ... ; BasicProperties properties = ... ; channel.basicPublish(exchange, queue, properties, body);// uses a 5 second timeout
    channel.waitForConfirmsOrDie(5 _000);
}
Copy the code

In the previous example, we post a message as usual and wait for its confirmation using the Channel#waitForConfirmsOrDie(long) method. Once the message is acknowledged, the method returns. This method throws an exception if the message is not acknowledged within the timeout period or if it is nack-ed (meaning that the agent cannot process it for some reason). Handling of exceptions typically involves logging error messages and/or retrying sending messages.

This technique is very simple, but has one major drawback: it slows down publishing significantly, since the confirmation of a message blocks all subsequent messages from being published. This approach does not provide throughput of more than a few hundred published messages per second. However, this is sufficient for some applications.

Strategy #2: Publish messages in batches

int batchSize = 100;
int outstandingMessageCount = 0;
while (thereAreMessagesToPublish()) {
    byte[] body = ... ; BasicProperties properties = ... ; channel.basicPublish(exchange, queue, properties, body); outstandingMessageCount++;if (outstandingMessageCount == batchSize) {
        ch.waitForConfirmsOrDie(5 _000);
        outstandingMessageCount = 0; }}if (outstandingMessageCount > 0) {
    ch.waitForConfirmsOrDie(5 _000);
}
Copy the code

Waiting for a batch of messages to be acknowledged significantly improves throughput (up to 20-30 times with remote RabbitMQ nodes) compared to waiting for a single message to be acknowledged. One drawback is that we don’t know exactly what went wrong in the event of a failure, so we might have to keep a whole batch in memory to record something meaningful or repost the message. This scheme is also synchronous, so it blocks the publication of messages.

Strategy #3: Handle publisher validation asynchronously

The broker acknowledges published messages asynchronously and only needs to register a callback on the client side to receive notification of these acknowledgments:

Channel channel = connection.createChannel();
channel.confirmSelect();
channel.addConfirmListener((sequenceNumber, multiple) -> {
    // code when message is confirmed
}, (sequenceNumber, multiple) -> {
    // code when message is nack-ed
});
Copy the code

There are two callbacks: one for the acknowledgement message and one for the Hack-ed message (which can be considered lost by the agent). Each callback takes two arguments:

  • SequenceNumber: Indicates the number of an acknowledged or acknowledged message.
  • Multiple: This is a Boolean value. If it is false, then confirm/hack-ed only one message. If it is true, confirm/hack-ed all messages with a small or equal sequence number.

The proxy listens asynchronously for published message consumption and only needs to register a callback on the client side to receive notifications of these returns:

channel.addReturnListener(r -> {
    System.err.println("= = = = = = = = = = = = = = = = = = = = = = = = = = =");
    System.err.println("Return code:" + r.getReplyCode() + - Return description: "" + r.getReplyText());
    System.err.println("Switch :" + r.getExchange() + "- routing key." + r.getRoutingKey() );
    System.err.println("Return subject:" + new String(r.getBody()));
    System.err.println("= = = = = = = = = = = = = = = = = = = = = = = = = = =");
});
Copy the code

The r argument is a Return object that listens when the Broker’s Exchange has correctly received the message but has not entered the Queue.

The serial number can be obtained by Channel#getNextPublishSeqNo() before publishing:

int sequenceNumber = channel.getNextPublishSeqNo());
ch.basicPublish(exchange, queue, properties, body);
Copy the code

An easy way to associate a message with a sequence number is to use a mapping. Suppose we want to publish strings, because they easily become byte arrays for publishing. Here is a code example that uses a mapping to associate the publication sequence number with the string body of the message:

ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
// ... code for confirm callbacks will come later
String body = "...";
outstandingConfirms.put(channel.getNextPublishSeqNo(), body);
channel.basicPublish(exchange, queue, properties, body.getBytes());
Copy the code

The publishing code now uses map to track outbound messages. We need to clean up this map when acknowledgement arrives and do things like log warnings when messages are rejected:

// ConcurrentNavigableMap A map that is sorted in ascending order by key size
// #headMap returns a map of data less than or equal to the current key
// outstandingConfirms- original map
/ / confirmed - the new map
// confirmed.clear(); 
// outstandingConfirms that the same data as confirmed are deleted. Confirm that all data are deleted
ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
ConfirmCallback cleanOutstandingConfirms = (sequenceNumber, multiple) -> {
    if (multiple) {
        ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(
          sequenceNumber, true
        );
        // All data in the new map is deleted, as well as the same data in the old map
        confirmed.clear();
    } else{ outstandingConfirms.remove(sequenceNumber); }}; channel.addConfirmListener(cleanOutstandingConfirms, (sequenceNumber, multiple) -> { String body = outstandingConfirms.get(sequenceNumber); System.err.format("Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n",
      body, sequenceNumber, multiple
    );
    cleanOutstandingConfirms.handle(sequenceNumber, multiple);
});
// ... publishing code
Copy the code

The previous example included a callback to clean up the map upon confirmation of arrival. Note that this callback handles both single and multiple acknowledgements. This callback is used when the confirmation arrives (as the first parameter to Channel#addConfirmListener). The ack-ed message’s callback retrieves the message body and issues a warning. It then re-uses the previous callback to clean up the mapping that has not been confirmed (whether the message is confirmed or Nack-ed, their corresponding entry in the mapping must be deleted).