This is the 16th day of my participation in the August More Text Challenge. For details, see: August More Text Challenge


Related articles

RabbitMQ series: RabbitMQ series


preface

  • Create a SpringBoot project. For details, see the two ways to create a SpringBoot project

  • The directory structure is as follows

  • Code architecture diagram

  • Create two queues QA and QB with the TTL set to 10S and 40S respectively.

  • Then create a switch X and a dead-letter switch Y, both of type Direct.

  • Create a dead letter queue QD.

1. Delay queue

  • Pom is introduced into the jar

    • <! <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId>  <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.247.</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <! --swagger--> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>2.92.</version>
             </dependency>
             <dependency>
                 <groupId>io.springfox</groupId>
                 <artifactId>springfox-swagger-ui</artifactId>
                 <version>2.92.</version> </dependency> <! <groupId> <artifactId>spring-rabbit-test</artifactId> </artifactId> <scope>test</scope> </dependency>Copy the code
  • Change the suffix application.proprties to yml

    • Spring: rabbitmq: host: IP address port:5672
          username: admin
          password: 111111
      Copy the code
  • Add the Swagger Config class

    • import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      import springfox.documentation.builders.ApiInfoBuilder;
      import springfox.documentation.service.ApiInfo;
      import springfox.documentation.service.Contact;
      import springfox.documentation.spi.DocumentationType;
      import springfox.documentation.spring.web.plugins.Docket;
      import springfox.documentation.swagger2.annotations.EnableSwagger2;
      
      /** * Swagger class *@author DingYongJun
       * @date2021/8/6 * /
      @Configuration
      @EnableSwagger2
      public class SwaggerConfig {
          @Bean
          public Docket webApiConfig(a) {
              return new Docket(DocumentationType.SWAGGER_2)
                      .groupName("webApi")
                      .apiInfo(webApiInfo())
                      .select()
                      .build();
          }
      
          private ApiInfo webApiInfo(a) {
              return new ApiInfoBuilder()
                      .title("Rabbitmq Interface Documentation")
                      .description("This document describes the RabbitMQ microservice interface definition.")
                      .version("1.0")
                      .contact(new Contact("dayu"."https://juejin.cn/user/2084329779387864/posts"."[email protected]")) .build(); }}Copy the code
  • MQ config

    • import org.springframework.amqp.core.*;
      import org.springframework.beans.factory.annotation.Qualifier;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      
      import java.util.HashMap;
      import java.util.Map;
      
      /** * MQ configuration class *@author DingYongJun
       * @date2021/8/6 * /
      @Configuration
      public class TtlQueueConfig {
          public static final String X_EXCHANGE = "X";
          public static final String QUEUE_A = "QA";
          public static final String QUEUE_B = "QB";
          public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
          public static final String DEAD_LETTER_QUEUE = "QD";
      
          / / declare xExchange
          @Bean("xExchange")
          public DirectExchange xExchange(a) {
              return new DirectExchange(X_EXCHANGE);
          }
      
          / / declare xExchange
          @Bean("yExchange")
          public DirectExchange yExchange(a) {
              return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
          }
      
          Declare queue A with A TTL of 10s and bind it to the corresponding dead-letter switch
          @Bean("queueA")
          public Queue queueA(a) {
              Map<String, Object> args = new HashMap<>(3);
              // Declare the dead-letter switch bound to the current queue
              args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
              // Declare the dead-letter routing key for the current queue
              args.put("x-dead-letter-routing-key"."YD");
              // Declare the TTL of the queue
              args.put("x-message-ttl".10000);
              return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
          }
      
          // Bind queue A to switch X
          @Bean
          public Binding queueaBindingX(@Qualifier("queueA") Queue queueA,
                                        @Qualifier("xExchange") DirectExchange xExchange) {
              return BindingBuilder.bind(queueA).to(xExchange).with("XA");
          }
      
          // Declare queue B with TTL 40s and bind to the corresponding dead-letter switch
          @Bean("queueB")
          public Queue queueB(a) {
              Map<String, Object> args = new HashMap<>(3);
              // Declare the dead-letter switch bound to the current queue
              args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
              // Declare the dead-letter routing key for the current queue
              args.put("x-dead-letter-routing-key"."YD");
              // Declare the TTL of the queue
              args.put("x-message-ttl".40000);
              return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
          }
      
          Declare queue B bound to switch X
          @Bean
          public Binding queuebBindingX(@Qualifier("queueB") Queue queue1B,
                                        @Qualifier("xExchange") DirectExchange xExchange) {
              return BindingBuilder.bind(queue1B).to(xExchange).with("XB");
          }
      
          // Declare the dead letter queue QD
          @Bean("queueD")
          public Queue queueD(a) {
              return new Queue(DEAD_LETTER_QUEUE);
          }
      
          // Declare the QD binding of the dead letter queue
          @Bean
          public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,
                                              @Qualifier("yExchange") DirectExchange yExchange) {
              return BindingBuilder.bind(queueD).to(yExchange).with("YD"); }}Copy the code
  • Controller simulates the producer

    • import lombok.extern.slf4j.Slf4j;
      import org.springframework.amqp.rabbit.core.RabbitTemplate;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.web.bind.annotation.GetMapping;
      import org.springframework.web.bind.annotation.PathVariable;
      import org.springframework.web.bind.annotation.RequestMapping;
      import org.springframework.web.bind.annotation.RestController;
      
      import java.util.Date;
      
      /** * controller analog producer *@author DingYongJun
       * @date2021/8/6 * /
      @Slf4j
      @RequestMapping("ttl")
      @RestController
      public class SendMsgController {
          @Autowired
          private RabbitTemplate rabbitTemplate;
      
          @GetMapping("sendMsg/{message}")
          public void sendMsg(@PathVariable String message) {
              log.info("Current time :{}, send a message to both TTL queues :{}".new Date(), message);
              rabbitTemplate.convertAndSend("X"."XA"."Message from queue with TTL of 10S:" + message);
              rabbitTemplate.convertAndSend("X"."XB"."Message from queue with TTL 40S:"+ message); }}Copy the code
  • consumers

    • import com.rabbitmq.client.Channel;
      import lombok.extern.slf4j.Slf4j;
      import org.springframework.amqp.core.Message;
      import org.springframework.amqp.rabbit.annotation.RabbitListener;
      import org.springframework.stereotype.Component;
      
      import java.io.IOException;
      import java.util.Date;
      
      /** * Consumer *@author DingYongJun
       * @date2021/8/6 * /
      @Slf4j
      @Component
      public class DeadLetterQueueConsumer {
          @RabbitListener(queues = "QD")
          public void receiveD(Message message, Channel channel) throws IOException {
              String msg = new String(message.getBody());
              log.info("Current time: {}, received dead-letter queue message {}".newDate().toString(), msg); }}Copy the code
  • So, is it easier with SpringBoot integrated?

  • Ready, let’s launch the project and type it in the browser

    • http:/ / localhost: 8080 / TTL/sendMsg/hello! The big fish
      Copy the code
  • The execution result

    • Calculate the time. The first interval is 10 seconds.
    • The second is 40s.
    • Perfect for our design expectations!
    • The first message becomes a dead-letter message after 10S and is consumed by the consumer, and the second message becomes a dead-letter message after 40S and is consumed, thus creating a delayed queue.

Optimize the delay queue

  • With the above design, what if we now need to add a message with a 50 second delay?

  • Are we going to add another MQ config? That’s not smart!

  • If the demand is to book a conference room, wouldn’t there be an infinite number of queues?

  • So we need to optimize the above structure!

  • Code architecture diagram

  • The new MQ config

    • import org.springframework.amqp.core.*;
      import org.springframework.beans.factory.annotation.Qualifier;
      import org.springframework.context.annotation.Bean;
      import org.springframework.stereotype.Component;
      
      import java.util.HashMap;
      import java.util.Map;
      /** * New MQ configuration class *@author DingYongJun
       * @date2021/8/6 * /
      @Component
      public class MsgTtlQueueConfig {
          public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
          public static final String QUEUE_C = "QC";
      
          // Declare queue C dead letter switch
          @Bean("queueC")
          public Queue queueB(a) {
              Map<String, Object> args = new HashMap<>(3);
              // Declare the dead-letter switch bound to the current queue
              args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
              // Declare the dead-letter routing key for the current queue
              args.put("x-dead-letter-routing-key"."YD");
              // No TTL attribute is declared
              return QueueBuilder.durable(QUEUE_C).withArguments(args).build();
          }
      
          Declare queue B bound to switch X
          @Bean
          public Binding queuecBindingX(@Qualifier("queueC") Queue queueC,
                                        @Qualifier("xExchange") DirectExchange xExchange) {
              return BindingBuilder.bind(queueC).to(xExchange).with("XC"); }}Copy the code
  • controller

    •     @RequestMapping(value = "sendExpirationMsg", method = RequestMethod.GET)
          public void sendMsg(@RequestParam Map<String,Object> parmsMap) {
              String message = parmsMap.get("message").toString();
              String ttlTime = parmsMap.get("ttlTime").toString();
              rabbitTemplate.convertAndSend("X"."XC", message, correlationData ->{
                  correlationData.getMessageProperties().setExpiration(ttlTime);
                  return correlationData;
              });
              log.info("Current time :{}, send a TTL of {} milliseconds to queue C:{}".new Date(),ttlTime, message);
          }
      Copy the code
    • In this case, we use Map to accept parameters. After all, Map is very useful

  • Browser input

    • http://localhost:8080/ttl/sendExpirationMsg? Message = Hello Mr. Big Fish &ttlTime=50000
      http://localhost:8080/ttl/sendExpirationMsg? Message = Hello Mr. Big Fish &ttlTime=5000
      Copy the code
  • The execution result

    • Perfect to meet the requirements! Expiration time custom! Now that’s intelligence!

The MQ plug-in implements the delay queue

  • That doesn’t seem like a problem, does it?

  • But know that MQ will only check if the first message is out of date if the first message is too late

  • Will cause our second message not to be executed first

  • Because the queue is first in, first out. Is that all right?

  • Seeing is believing

  • It is obvious that

    • What we want is a second 5-second delay for the first consumption, followed by a 50-second delay for the message.
    • In effect, MQ waits until the first one is executed before executing the second one.
    • That’s why it’s first-in, first-out.
  • The MQ plugin solves this problem for us

  • First download rabbitmq_delayed_message_Exchange

  • Must look at the version oh ~ will report version error otherwise.

  • And then uploaded to the server in the/usr/lib/rabbitmq/lib/rabbitmq_server – 3.7.18 / plugins directory

  • Run rabbitmq-plugins enable rabbitmq_delayed_message_exchange to install rabbitmq-plugins

  • Restart MQ systemctl restart rabbitmq-server

  • Take a look at our MQ backend page

  • The switch type for x-delayed-message appeared. Congratulations on enabling the plug-in!

  • MQ config

    • import org.springframework.amqp.core.Binding;
      import org.springframework.amqp.core.BindingBuilder;
      import org.springframework.amqp.core.CustomExchange;
      import org.springframework.amqp.core.Queue;
      import org.springframework.beans.factory.annotation.Qualifier;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      
      import java.util.HashMap;
      import java.util.Map;
      
      @Configuration
      public class DelayedQueueConfig {
          public static final String DELAYED_QUEUE_NAME = "delayed.queue";
          public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
          public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
      
          @Bean
          public Queue delayedQueue(a) {
              return new Queue(DELAYED_QUEUE_NAME);
          }
      
          // Custom switch What we have defined here is a delayed switch
          @Bean
          public CustomExchange delayedExchange(a) {
              Map<String, Object> args = new HashMap<>();
              // Customize the switch type
              args.put("x-delayed-type"."direct");
              return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message".true.false,
                      args);
          }
      
          @Bean
          public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue,
                                             @Qualifier("delayedExchange") CustomExchange
                                                     delayedExchange) {
              returnBindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs(); }}Copy the code
  • controller

    •     public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
          public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
      
          @GetMapping("sendDelayMsg/{message}/{delayTime}")
          public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) {
              rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message,
                      correlationData -> {
                          correlationData.getMessageProperties().setDelay(delayTime);
                          return correlationData;
                      });
              log.info(Queue :{} send a message with a delay of {} milliseconds to the delayed queue. Queue :{}".new
                      Date(), delayTime, message);
          }
      Copy the code
  • perform

    • http://localhost:8080/ttl/sendDelayMsg/come on baby1/20000
      http://localhost:8080/ttl/sendDelayMsg/come on baby2/2000
      Copy the code
  • The execution result

  • Solution!!


I see no ending, but I will search high and low

If you think I blogger writes good! Writing is not easy, please like, follow, comment to encourage the blogger ~hahah