One, still use three chicken feathers as introduction

  • Really! I do not lie to you ~ I believe that we have encountered similar: order 30 minutes after the payment is automatically canceled development task
  • thenToday aJust to understand how to use itRabbitMQdoneUnder the distributedTime out order

  • Yes, as a pursuit of programmers, we went on to an article on the actual combat | I suggest you use DelayQueue fix timeout orders – (1) Go on.

Two, MQ delay message implementation principle

The RabbitMQ website boasts:

RabbitMQ is the most widely deployed open source message broker.
Copy the code

RabbitMQ is a message middleware that producers generate and consumers consume. It follows the AMQP (Advanced Message Queuing Protocol) and is the most widely deployed open source message broker. So today I’m going to play around with delay queues with RabbitMQ.

To implement deferred tasks using RabbitMQ you must first understand the two concepts of RabbitMQ: TTL for messages and dead-letter Exchange.

  • TTL (Time To Live) of messages

The TTL of a message is the lifetime of the message. RabbitMQ can set TTL for queues and messages separately. Setting the queue is the retention time that the queue has no consumer attached, or it can be set separately for each individual message. After that, we consider the message dead. We call it a dead letter. If the queue is set and the message is set, the smaller one will be chosen. So if a message is routed to a different queue, it may die at a different time (different queue Settings). I’ll focus on TTL for a single message, because that’s the key to implementing deferred tasks.

So, how do you set this TTL value? There are two ways to do this. The first way is to set the “x-message-TTL” property of the queue when it is created, as follows:

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl".6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);
Copy the code

All messages sent to the queue will not survive more than 6 seconds at most.

The alternative is to set the TTL for each message as follows:

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.expiration("6000");
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(exchangeName, routingKey, mandatory, properties, "msg body".getBytes());
Copy the code

The expiration time of this message is set to 6s.

But there is a difference between these two approaches, if set the TTL attribute of the queue, so once the message expired, will be the queue, and the second way, message even if expired, also will not necessarily be discarded immediately, because the news is overdue in the delivered to the consumer decision before, if the current queue has serious backlog of messages, Messages that are out of date may survive for a long time. It is also important to note that if the TTL is not set, the message will never expire, and if the TTL is set to 0, the message will be discarded unless it can be delivered directly to the consumer.

A Dead Letter alone is not enough to delay a task, but a Dead Letter Exchange.

  • Dead Letter Exchanges

The concept of Exchage will not be repeated here. A message will enter a dead-letter route if it meets the following conditions. Remember that this is a route, not a queue. A route can correspond to many queues.

  1. A message was sentConsumerIt was rejected andrejectMethod argumentrequeueisfalse. That means it won’t be put back in the queue and used by other consumers.
  2. The above messageTTLWhen we get there, the message is out of date.
  3. The queue length limit is full. The first messages are discarded or dropped on a dead-letter route.

A Dead Letter Exchange is just a normal Exchange, just like creating any other Exchange. If a message expires in a queue that is configured with Dead Letter Exchange, the message is automatically forwarded and sent to the Dead Letter Exchange.

  • Schematic diagram

The deferred task is implemented through TTL and Dead Letter Exchange of messages. We need to set up two queues, one for sending messages and one for forwarding target queues after messages expire.

The producer produces a delayed message and uses different Routingkeys to route the message to different delay queues according to the required delay time. Each queue is set with different TTL attributes and bound to the same dead-letter switch. After the message expires, according to the different Routingkeys, It will be routed to different dead-letter queues, and consumers only need to listen to the corresponding dead-letter queues for processing.

Iii. Actual combat exercise

  • Download the Windows sample installation
  1. To download RabbitMQ, the ErLang environment is required
  2. Run the command
rabbitmq-plugins enable rabbitmq_management
Copy the code

Start the Web management plugin, then start rabbitmq-server and go to http://localhost:15672/#/. Enter the secret command and you can see it.

  • Plug-in installation

Before RabbitMQ 3.6.x we usually use dead letter queue (DLX)+TTL expiration time to implement delay queue.

Starting with RabbitMQ 3.6.x (now 3.8.+), RabbitMQ provides a delay queue plugin that can be downloaded and placed under plugins in RabbitMQ’s root directory. Delay queue plug-in

  1. The official address 2. JFrog Bintray addressI couldn’t find it on the official website when I installed itX 3.7., but3.8.0It’s backwards compatibleX 3.7.And then I was inBintrayTo find theX 3.7., we trust to find the corresponding version of the plug-in ha….

plugins

rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Copy the code
  • Set up the SpringBoot environment
  1. The YML configuration is as follows
# integration the rabbitmq
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    connection-timeout: 150000
    publisher-confirms: true    # Enable the confirmation mechanism to use message confirmation mode,
    publisher-returns: true     # Enable return validation
    template:                   After the message is sent, wait for the response asynchronously
      mandatory: true           # set to true, consumers will be listened for by return if messages are not routed to the appropriate queue and will not be deleted automatically
Copy the code
  1. The launch configuration declares several beans
@Configuration
public class MQConfig {
    @Bean
    publicRabbitListenerContainerFactory<? > rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory =new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }

    public static final String DELAY_EXCHANGE = "Ex.DelayExchange";
    public static final String DELAY_QUEUE = "MQ.DelayQueue";
    public static final String DELAY_KEY = "delay.#";

    /** * delay switch **@return TopicExchange
     */
    @Bean
    public TopicExchange delayExchange(a) {
        Map<String, Object> pros = new HashMap<>(3);
        // Set the switch to support delayed message push
        pros.put("x-delayed-message"."topic");
        TopicExchange exchange = new TopicExchange(DELAY_EXCHANGE, true.false, pros);
        // We can also set exchange.setdelayed (true) in the Exchange declaration to enable delayed queuing
        exchange.setDelayed(true);
        return exchange;
    }

    /** * delay queue **@return Queue
     */
    @Bean
    public Queue delayQueue(a) {
        return new Queue(DELAY_QUEUE, true);
    }

    /** * Bind queues and switches, and set routing rules key **@return Binding
     */
    @Bean
    public Binding delayBinding(a) {
        returnBindingBuilder.bind(delayQueue()).to(delayExchange()).with(DELAY_KEY); }}Copy the code
  1. Create a producer
/ * * *@author LiJing
 * @ClassName: MQSender
 * @Description: MQ send producer *@date2019/10/9 11:50 * /
@Component
public class MQSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            System.out.println("correlationData: " + correlationData);
            System.out.println("ack: " + ack);
            if(! ack) { System.out.println("Exception Handling...."); }}};final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
        @Override
        public void returnedMessage(Message message, int replyCode, String replyText, String exchange
        , String routingKey) {
            System.out.println("return exchange: " + exchange + ", routingKey: "
                    + routingKey + ", replyCode: " + replyCode + ", replyText: "+ replyText); }};public void sendDelay(Object message, int delayTime) {
        After the message is sent, wait for the response asynchronously
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);
        //id + timestamp globally unique
        CorrelationData correlationData = new CorrelationData("delay" + System.nanoTime());
        // Specify header delay when sending messages
        rabbitTemplate.convertAndSend(MQConfig.DELAY_EXCHANGE, "delay.boot", message,
                new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        // Set message persistence
                        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                        // Either way
                        //message.getMessageProperties().setHeader("x-delay", "6000");
                        message.getMessageProperties().setDelay(delayTime);
                        returnmessage; } }, correlationData); }}Copy the code
  1. Create a consumer
/ * * *@author LiJing
 * @ClassName: MQReceiver
 * @Description: Consumer *@date2019/10/9 11:51 * /
@Component
@Slf4j
public class MQReceiver {
    @RabbitListener(queues = MQConfig.DELAY_QUEUE)
    @RabbitHandler
    public void onDelayMessage(Message msg, Channel channel) throws IOException {
        long deliveryTag = msg.getMessageProperties().getDeliveryTag();
        channel.basicAck(deliveryTag, true);
        System.out.println("Delay queue is in" + LocalDateTime.now()+"Time," + Message received after delay: + newString(msg.getBody())); }}Copy the code

5. Create an MQ test controller

@RestController
@RequestMapping("/mq")
public class MqController extends AbstractController {

    @Autowired
    private MQSender mqSender;

    @GetMapping(value = "/send/delay")
    public void sendDelay(int delayTime) {
        String msg = "hello delay";
        System.out.println("Start time to send :" + LocalDateTime.now() + "Test sending delay messages ====>"+ msg); mqSender.sendDelay(msg, delayTime); }}Copy the code
  1. Turn it on, test it out
 http://localhost:8080/api/mq/send/delay?delayTime=6000
 http://localhost:8080/api/mq/send/delay?delayTime=10000
Copy the code

Four, the summary is coming

Delayed queues are useful in delayed processing scenarios, and RabbitMQ can take advantage of RabbitMQ features such as reliable message delivery, reliable message delivery, and dead letter queues to ensure that messages are consumed at least once and not discarded if they are not properly processed.

In addition, RabbitMQ cluster features can be used to solve the single point of failure problem, a single node failure will not cause delayed queue unavailable or message loss.

Of course, there are many other options for delayed queues, such as Using Redis’s ZSet, Quartz, or Kafka’s time wheel, all of which have their own characteristics, but just like Hearthstone, this knowledge is like the cards in your hand. The more you know, the more cards you can use, and when you have a problem, the more cards you can use. Therefore, it takes a lot of knowledge and experience accumulation to create a better card combination, so that their problem-solving ability is better improved.

V. Concluding remarks

Fat chao told me: there is a way to learn successively, there is a specialty, the teacher.

That’s the end of today’s explanation, please visit my gitHub mybot project 888 branch to check out the specific code,fork a experience, or leave a comment section to discuss, write bad, please give more advice ~~