- The following example code can be downloaded on Github or gitee
Github: Code links
Gitee: Code links
- The first two posts:
Springboot RabbitMQ for beginners
Springboot RabbitMQ Switch types
The high availability of RabbitMQ is mainly reflected in the process of sending, transmitting, and receiving messages. It ensures that messages are successfully sent, not lost, and are not consumed or re-consumed.
- Whether the message is sent successfully is mainly for the message production confirmation mechanism of the producer side.
- Messages are not lost, mainly by rabbitMQ message persistence.
- For message confirmation consumption/non-repeat consumption, it is mainly aimed at the confirmation consumption mechanism of message on the consumer side.
I. Message production confirmation mechanism
For message is sent successfully, can be unified in the rabbitmq custom components operation setting logical rabbitTemplate related news production confirmation. SetConfirmCallback and rabbitTemplate setReturnCallback.
@slf4j@configuration public class RabbitmqConfig {// Customize the Configuration component for sending RabbitMQ messages. RabbitTemplate @bean public RabbitTemplate RabbitTemplate () {/ / set the "send a message to confirm after" connectionFactory. SetPublisherConfirms (true); / / set the "send a message to return acknowledgement information" connectionFactory. SetPublisherReturns (true); RabbitTemplate RabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMandatory(true); // If the message is successfully sent, The output feedback "message is sent successfully" rabbitTemplate. SetConfirmCallback ((correlationData, ack, Cause) - > log. The info (" message is sent successfully: correlationData ({}), an ack ({}), cause ({}) ", correlationData, ack, cause)); // If the message fails to be sent, Output "message sent failure - lost" feedback rabbitTemplate. SetReturnCallback ((message, replyCode replyText, exchange, routingKey) -> The info (" message loss: exchange ({}), the route ({}), replyCode ({}), replyText ({}), message: {} ", exchange, routingKey, replyCode, replyText, me ssage)); / / define the format of message transmission format for JSON string rabbitTemplate. SetMessageConverter (new Jackson2JsonMessageConverter ()); RabbitTemplate return RabbitTemplate; }}Copy the code
Second, message persistence
- Durable Switches and queues Are durable. If the durable parameter is set to true, the durable switches and queues will not be lost when the RabbitMQ server restarts.
- When sending a Message to can choose to set the persistence for the Message, the Message body Message deliveryMode set to MessageDeliveryMode. The PERSISTENT PERSISTENT, when the Message to consumption the rabbitmq server restart, the Message is still there, If all messages are set to persist, performance will suffer, and memory and disk read and write speeds can vary widely.
3. Message confirmation consumption mechanism
- RabbitMQ provides a message acknowledgement mechanism, called ACK mode, to ensure that messages can be prepared for consumption and not re-consumed. RabbitMQ has three message confirmation mechanisms: NONE (no confirmation), AUTO (automatic confirmation), and MANUAL (MANUAL confirmation).
- There is no need to confirm the flow chart as shown in the following figure. In this mode, the producer side does not know whether the message is successfully consumed, and there may be repeated consumption/message consumption failure:
- The code catalog is shown in the figure, demonstrating automatic confirmation and manual confirmation:
To set up an ACK mode, can be in yaml configuration file Settings spring. The rabbitmq. Listener. Simple. Acknowledge – mode: XXX, can also be set in the statement of listeners Bean, with simple listener SimpleRabbitListenerContainerFactory can:
@slf4j@configuration public class RabbitmqConfig {** ** The authentication mode is AUTO, using directExchange message model */ @bean public SimpleRabbitListenerContainerFactory singleListenerContainerAuto () {/ / define the message listener container factory SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); / / set the container factory instance used in the factory. The setConnectionFactory (connectionFactory); / / set the format of a message in the transmission, using the JSON format for transmission here factory. SetMessageConverter (new Jackson2JsonMessageConverter ()); // Sets the initial number of concurrent consumer instances. For a factory here. SetConcurrentConsumers (1); // Sets the maximum number of concurrent consumer instances. For a factory here. SetMaxConcurrentConsumers (1); // Set the number of messages pulled per instance in the concurrent consumer instance - in this case, 1 factory.setPrefetchCount(1); / / confirmation consumption patterns for automatic mechanism factory setAcknowledgeMode (AcknowledgeMode. AUTO); return factory; } / * * * - MANUAL confirmation consumption mode to MANUAL mechanism, adopting direct transport directExchange message model * / @ Bean public SimpleRabbitListenerContainerFactory SingleListenerContainerManual () {/ / define the message listener's container factory SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); / / set the container factory instance used in the factory. The setConnectionFactory (connectionFactory); / / set the format of a message in the transmission, using the JSON format for transmission here factory. SetMessageConverter (new Jackson2JsonMessageConverter ()); // Sets the initial number of concurrent consumer instances. For a factory here. SetConcurrentConsumers (1); // Sets the maximum number of concurrent consumer instances. For a factory here. SetMaxConcurrentConsumers (1); // Set the number of messages pulled per instance in the concurrent consumer instance - in this case, 1 factory.setPrefetchCount(1); / / confirmation consumption patterns for automatic mechanism factory setAcknowledgeMode (AcknowledgeMode. MANUAL); return factory; }}Copy the code
Automatic confirmation mode The RabbitMQ component notifies the producer when a message is successfully consumed and when it fails to consume:The automatic confirmation mode is seen on the consumer side as a normal message queue, while the manual confirmation consumption mode is more flexible.
- The confirmation consumption mode is AUTO, and directExchange message model producer is used for direct transmission
@slf4j@Component Public class AutoAckPublisher {// Define RabbitMQ message operation Component RabbitTemplate @autoWired private RabbitTemplate rabbitTemplate; /** * Send message */ public void sendMsg(Order Order) {try {// Set switch rabbitTemplate.setExchange(RabbitMqConstants.AUTO_ACKNOWLEDGE_EXCHANGE); / / set the routing rabbitTemplate. SetRoutingKey (RabbitMqConstants. AUTO_ACKNOWLEDGE_ROUTING_KEY); / / send a message rabbitTemplate. ConvertAndSend (order); Log.info (" Confirm consumption mode for automatic confirmation mechanism - message model Directexchange-one-producer-send message: {} ",order); }catch (Exception e){log.error(" directexchange-one-producer-send message :{}, error :{} ",order, e); }}}Copy the code
- The confirmation consumption mode is automatic confirmation mechanism -AUTO, and directExchange message model – consumer is adopted
@Slf4j @Component public class AutoAckConsumer { @RabbitListener(queues = RabbitMqConstants.AUTO_ACKNOWLEDGE_QUEUE, containerFactory = "singleListenerContainerAuto") public void consumeMsg(Order order) { try { Log.info (" Auto-based automatic confirmation consumption mode - consumer listening consumption message - content: {} ",order); }catch (Exception e){log.error(" auto-based consumer listening consumer message :{}, order, e); }}}Copy the code
(2) Manual confirmation flow chart is shown in the figure. When abnormal messages occur in the process of message processing, it is necessary to manually confirm the processing of the abnormal message and whether the message is re-queued.
- The acknowledgement consumption mode is MANUAL acknowledgement mechanism, using directExchange message model producer
@Slf4j @Component public class ManualAckPublisher { @Autowired private RabbitTemplate rabbitTemplate; Public void sendMsg(Order Order) {try { rabbitTemplate.setExchange(RabbitMqConstants.MANUAL_ACKNOWLEDGE_EXCHANGE); rabbitTemplate.setRoutingKey(RabbitMqConstants.MANUAL_ACKNOWLEDGE_ROUTING_KEY); rabbitTemplate.convertAndSend(order); Log.info (" Confirm consumption mode for manual confirmation mechanism - message model Directexchange-one-producer-send message: {} ", order); }catch (Exception e){log.error(" directexchange-one-producer-send message :{}, error :{} ", order, e); }}}Copy the code
- Confirmation consumption mode is MANUAL confirmation mechanism, using directExchange message model – consumer
After listening to the news and messages processed successfully, a confirmation message by basicAck successful consumption, namely the message processing when catch exceptions to fail, there are two ways, one is refused to the message and message in the queue again, another is refused to the message and discarded, usually to go into the queue, There will still be exceptions that can’t be consumed unless the exception is fixed, and in the absence of that exception, subsequent messages will be blocked and unable to be consumed, so requeuing messages may not be a good option. In general, you can retain the message information and then discard the message, and finally resend the message. Or the message can be placed in a dead letter queue, not listened on, and then retrieved from the rabbitMQ management background/re-listened on and resend to the original queue for consumption. If the exception is rectified, the message can be processed successfully.
@Slf4j @Component public class ManualAckConsumer { @RabbitListener(queues = RabbitMqConstants.MANUAL_ACKNOWLEDGE_QUEUE, containerFactory = "singleListenerContainerManual") public void consumeMsg(Order order, Channel channel, @header (AmqpHeaders.DELIVERY_TAG) Long tag) throws IOException {try {log.info(" Based on MANUAL validation consuming mode, consumers listen for consuming messages. {}, contents: {} ", tag, order); //int num = 1/0; // After executing the business logic, manually confirm consumption, where the first parameter is: message distribution identifier (globally unique); The second argument: whether to allow batch validation consumption channel.basicAck(tag, false); }catch (Exception e){// The second parameter, reueue, is re-enqueued. If it is true, the Exception message needs to be processed artificially. Channel. basicReject(tag, true); Log. error(" Based on MANUAL validation consumption pattern - consumer listening consumption message :{}, message delivery label :{}, exception: ", order, tag, e); }}}Copy the code
As shown in the figure, there is a message that is unacked, and there are get messages below. When clicking on the message queue is empty, there are 0 messages ready to be consumed, and the messages that are being consumed are always in unacked state and cannot be taken out.
This is not a good idea online. After you stop listening, the message becomes ready and can be retrieved. You can see the message “Retrieving message is a destructive operation”.
There are four modes of message retrieval, which are: do not confirm that the message is re-enqueued, confirm that the message is not re-enqueued, deny that the message is re-enqueued, deny that the message is not re-enqueued. You can see the contents of the message when you retrieve it.
For confirmation message consumption, to avoid the above situation, you can use a dead letter queue to process, catch the abnormal message, send the message to the dead letter queue, do not listen to the message queue, and finally repair the abnormal message to send the message to the original queue for consumption, see the next blog post.
References: Distributed Middleware Combat rabbitMQ Combat Guide