Message acknowledgement mechanism

  • The sender ensures that the producer knows whether his message was successfully sent to the switch or queue
  • After the consumer confirms that the message has been successfully consumed by the consumer, it sends a confirmation flag to remove the message from MQ

How do I determine the success or failure of sending a message?

  • Acknowledgement failed to be sent when the acknowledgement message could not be routed to any queue
  • When a message can be routed to a queue, the message is confirmed after all queues are successfully sent. For persistent queues, it means that they have been written to disk, and for mirrored queues, it means that all mirrors accepted successfully.

How does a consumer inform RabbitMQ of a successful or failed message consumption?

  • The automatic acknowledgment is immediately acknowledged after the message is sent to the consumer, but the message may be lost. If the consumer’s consumption logic throws an exception, that is, the consumer fails to process the message, the message is equivalent to being lost
  • In manual confirmation, the consumer invokes ACK, nack, and reject to confirm the message. In manual confirmation, some operations can be performed after the service fails. If the message is not ack, it will be sent to the next consumer
  • If a service forgets to ACK, RabbitMQ will not send it any more data because RabbitMQ thinks the service has limited processing power

The sender confirms the instance

  • Add the configuration
The message is sent to the exchange for confirmation
spring.rabbitmq.publisher-confirms=true
The message is sent to the queue for confirmation
spring.rabbitmq.publisher-returns=true
Copy the code
  • ConfirmCallback interface for RabbitTemplate; ConfirmCallback interface for RabbitTemplate; ConfirmCallback interface for RabbitTemplate Callback when the message route does not reach the specified queue
Message is sent to the switch to monitor class @ Slf4j @ Component public class SendConfirmCallback implements RabbitTemplate. ConfirmCallback {@ Override public void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {
            log.info("Success... Message sent to switch successfully! correlationData:{}", correlationData);
        } else {
            log.info("Fail... Message sent to switch failed! correlationData:{}", correlationData); }}} /** * Message is not routed to queue listener class * @author by peng * @datein 2019-06-01 21:32
 */
@Slf4j
@Component
public class SendReturnCallback implements RabbitTemplate.ReturnCallback {
    
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.error("Fail... Message :{}, from exchange:{}, to routingKey:{}," +
                        ReplyCode :{},replyText:{}", message, exchange, routingKey, replyCode, replyText); }}Copy the code
  • Re-inject RabbitTemplate and set up two listening classes
@Configuration
public class RabbitConfig {
    
    @Bean
    RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback( new SendConfirmCallback());
        rabbitTemplate.setReturnCallback( new SendReturnCallback());
        returnrabbitTemplate; }}Copy the code
  • Define producers and consumers
@component public class Sender {@autoWired private RabbitTemplate RabbitTemplate; public voidsendConfirmSuccess() {
        String content = "Message sent to exist exchange!";
        this.rabbitTemplate.convertAndSend("directConfirmExchange"."exist", content);
        System.out.println("########### SendConfirmSuccess : " + content);
    }
    
    public void sendConfirmError() {
        String content = "Message sent to not exist exchange!";
        this.rabbitTemplate.convertAndSend("notExistExchange"."exist", content);
        System.out.println("########### SendConfirmError : " + content);
    }
    
    public void sendReturn() {
        String content = "Message sent to exist exchange! But no queue to routing to";
        this.rabbitTemplate.convertAndSend("directConfirmExchange"."not-exist", content);
        System.out.println("########### SendWReturn : "+ content); }} // Consumer @component @rabbitListener (queues ="existQueue")
public class Receiver {

    @RabbitHandler
    public void process(String message) {
        System.out.println("########### Receiver Msg:"+ message); }}Copy the code
  • The test class
@RunWith(value=SpringJUnit4ClassRunner.class)
@SpringBootTest
public class ConfirmTest {
    @Autowired
    private Sender sender;
    @Test
    public void sendConfirmSuccess() { sender.sendConfirmSuccess(); Result: The message was sent and consumed successfully, and the listener log was printed########### SendConfirmSuccess : Message sent to exist exchange!Success... Message sent to switch successfully! correlationData:null########### Receiver Msg:Message sent to exist exchange!
    }
    @Test
    public void sendConfirmError() { sender.sendConfirmError(); Result: Failed to send a message and entered a listening log########### SendConfirmError : Message sent to not exist exchange!Fail... Message sent to switch failed! correlationData:null Channel shutdown: channel error; protocol method:#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'notExistExchange' in vhost '/', class-id=60, method-id=40)
    }
    @Test
    public void sendReturn() { sender.sendReturn(); Result: Message sent to switch but not routed to queue (no queue matching routing key exists)########### SendWReturn : Message sent to exist exchange! But no queue to routing toSuccess... Message sent to switch successfully! correlationData:null Fail... message:(Body:'Message sent to exist exchange! But no queue to routing to'MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, DeliveryTag = 0]), switch exchange: directConfirmExchange, to the routing key routingKey: not to exist, no match is found queue, replyCode: 312, replyText: NO_ROUTE}}Copy the code

Consumer confirmation

  • Add the configuration
# Consumer message acknowledgement - manual ACK
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual
Copy the code
  • Consumer code
@Component
@RabbitListener(queues = "existQueue")
public class AckReceiver {
    
    @RabbitHandler
    public void process(String content, Channel channel, Message message) {
        try {
            System.out.println("########### message:"+ message); / / after the success of the business processing calls, messages will be confirmed consumer channel. BasicAck (message. GetMessageProperties () getDeliveryTag (),false); / / after a failed business processing call / / channel basicNack (message. GetMessageProperties () getDeliveryTag (),false.true);
            //channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
        } catch (IOException e) {
            e.printStackTrace();
        }
        System.out.println("########### Receiver Msg:"+ content); }}Copy the code