A consumption is successfully consumed through the producer ->MQ-> consumer, so messages can be lost in all three steps.

A message producer failed to send a message to MQ successfully

1.1 Transaction Mechanism

The AMQP protocol provides a transaction mechanism that enables transaction support when a message is delivered and rolls back the transaction if the message fails to be delivered.

Custom transaction manager

@Configuration
public class RabbitTranscation {
	
    @Bean
    public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory){
        return new RabbitTransactionManager(connectionFactory);
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        return newRabbitTemplate(connectionFactory); }}Copy the code

Modify the yml

Spring: RabbitMQ: # The message is returned when it is not received by the queue.true
Copy the code

Enabling transaction Support

rabbitTemplate.setChannelTransacted(true);
Copy the code

Call ReturnCallback when the message is not received

rabbitTemplate.setMandatory(true);
Copy the code

Producer post message

@Service
public class ProviderTranscation implements RabbitTemplate.ReturnCallback {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(a){
        // Set channel to start transaction
        rabbitTemplate.setChannelTransacted(true);
        rabbitTemplate.setReturnCallback(this);
    }
    
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("This message failed to be sent."+message+", please handle");
    }
    
    @Transactional(rollbackFor = Exception.class,transactionManager = "rabbitTransactionManager")
    public void publishMessage(String message) throws Exception {
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.convertAndSend("javatrip",message); }}Copy the code

However, this is rarely done because it is synchronous and blocks the sender waiting for the RabbitMq-server to respond before sending the next message, reducing the throughput and performance of the producer producing the message.

1.2 Sender confirmation mechanism

When sending a message, the channel is set to confirm mode, messages are assigned a unique ID, and RabbitMQ sends an acknowledgement to the producer once the message has been sent to the matching queue.

Enable the message confirmation mechanism

Spring: RabbitMQ: # The message is returned when it is not received by the queue.true# enable message confirmation publisher-confirm-type: correlatedCopy the code

Call ReturnCallback when the message is not received

rabbitTemplate.setMandatory(true);
Copy the code

Producer post message

@Service
public class ConfirmProvider implements RabbitTemplate.ConfirmCallback.RabbitTemplate.ReturnCallback {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(a) {
        rabbitTemplate.setReturnCallback(this);
        rabbitTemplate.setConfirmCallback(this);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if(ack){
            System.out.println("Confirmed this message:"+correlationData);
        }else{
            System.out.println("Confirmation failed:"+correlationData+"; Exception:"+cause); }}@Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("This message failed to be sent."+message+", please handle");
    }

    public void publisMessage(String message){
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.convertAndSend("javatrip",message); }}Copy the code

If message confirmation fails, we can compensate for the message, which is a message retry mechanism. Redelivery of a message when no acknowledgement is received. The following configuration is complete.

Spring: RabbitMQ:true# Enable the message confirmation mechanism publisher-confirmation-type: correlated Listener: simple: retry:true# max-attempts:5Initial-interval:3000
Copy the code

Ii After a message is sent to MQ, the MQ outage causes the message to be lost in the memory

Messages can be lost in MQ, and both queues and messages need to be persisted.

The @queue annotation gives us some queue-specific attributes, as follows:

  1. Name: indicates the queue name.
  2. Durable: Whether to be durable.
  3. A. exclusive B. exclusive C. exclusive D. exclusive
  4. AutoDelete: indicates whether to automatically delete the file.
  5. Arguments: Additional attribute parameters for the queue, with the following options, see Arguments in Figure 2:
    • X-message-ttl: indicates the expiration time of the message, in milliseconds.
    • X-expires: specifies how long a queue will be deleted after it has not been accessed, in milliseconds.
    • X-max-length: indicates the maximum length of the queue. If the maximum length is exceeded, the message will be deleted from the header of the queue.
    • X-max-length-bytes: indicates the maximum space occupied by the message content in the queue. If the memory size exceeds the threshold, the message will be deleted from the header of the queue.
    • X-overflow: Sets queue overflow behavior. This determines what happens to the message when the maximum length of the queue is reached. Valid values are drop-head, reject-publish, or reject-publish- DLX. The quorum queue type supports only drop-head.
    • X-dead-letter-exchange: the name of the dead-letter exchange to which messages that are expired or deleted (because the queue length is too long or the space exceeds the threshold) can be specified to be sent;
    • X-dead-letter-routing-key: dead letter routing key. This key is used when the message is sent to the dead letter exchange. If this key is not set, the original routing key value of the message is used
    • X-single-active-consumer: indicates whether the queue is a single active consumer. True indicates that only one consumer in the registered consumer group consumes the message and the others are ignored. False indicates that the message is circulated to all consumers (default false).
    • X-max-priority: maximum priority to be supported by the queue; If not set, the queue does not support message priority.
    • X-queue-mode (Lazy mode) : Sets the queue to Lazy mode to keep as many messages as possible on disk to reduce RAM usage; If not, the queue will retain the memory cache to deliver messages as quickly as possible;
    • X-queue-master-locator: sets the master node information of a mirror queue in cluster mode.

Persistent queue

When creating a queue, set the durable attribute to true and autoDelete to false

@Queue(value = "javatrip",durable = "false",autoDelete = "false")
Copy the code

Persistent message

When sending a message, set the deliveryMode of the message to 2. Messages are persistent by default in Spring Boot.

Three consumer consumption news, not finished consumption on the abnormal

The consumer has just consumed the message and has not yet processed the business, resulting in an exception. At this point, you need to turn off automatic acknowledgment and replace it with manual acknowledgment messages.

Example Change the YML mode to manual signature

Spring: RabbitMQ: Listener: simple: # Acknowledge -mode: manual # Prefetch:1
Copy the code

Consumers sign for it manually

@Component
@RabbitListener(queuesToDeclare = @Queue(value = "javatrip", durable = "true"))
public class Consumer {

    @RabbitHandler
    public void receive(String message, @Headers Map<String,Object> headers, Channel channel) throws Exception{

        System.out.println(message);
        // Unique message ID
        Long deliverTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        // Confirm the message
        if(...). { channel.basicAck(deliverTag,false);
        }else{
            // Consume failed, message returned to queue
            channel.basicNack(deliverTag,false.true); }}}Copy the code

Four summarizes

Why is the message lost?

Message loss can be caused by producers, MQ, and consumers

How to ensure the reliability of messages?

  • The sender adopts the sender confirmation mode
  • MQ does the persistence of queues and messages
  • The consumer manually confirms the message after successful consumption