This is the 25th day of my participation in Gwen Challenge

Accumulate over a long period, constant dripping wears away a stone 馃槃

What is a delay queue

The delay queue stores the corresponding delayed message. The so-called “delayed message” means that after the message is sent, consumers do not want to get the message immediately, but wait for a specified time before they get the message for consumption.

How does RabbitMQ implement delay queuing

The AMQP protocol and RabbitMQ themselves do not support delay queuing directly, but it can be simulated using DLX and TTL.

Implementing delay queuing

You need to create two queues, one for sending messages and one for forwarding the message after the message expires.

The producer sends a message to Queue1 with TTL set, such as 60s. The message waits 60 seconds in Queue1, and if no consumer consumes it, it is forwarded to Queue2, which has bound consumers and handles delayed messages.

Note :Queue1 has no consumers

Join the rely on

<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.0</version>
</dependency>
Copy the code

configure

spring:
  application:
    name: ttl-queue
  rabbitmq:
    host: 47.105198.54.
    port: 5672
    virtual-host: /test-1
    username: 11
    password: 111
mq:
  queueBinding:
    queue: prod_queue_pay
    dlQueue: dl-queue
    exchange:
      name: exchang_prod_pay
      dlTopicExchange: dl-topic-exchange
      type: topic
    key: prod_pay
    dlRoutingKey: dl-routing-key
Copy the code

Create a service queue and a dead letter queue

@Configuration
public class RabbitConfig {
    // Service queue configuration
    @Value("${mq.queueBinding.queue}")
    private String queueName;
    @Value("${mq.queueBinding.exchange.name}")
    private String exchangeMame;
    @Value("${mq.queueBinding.key}")
    private String key;
    
    // Dead letter queue configuration
    @Value("${mq.queueBinding.exchange.dlTopicExchange}")
    private String dlTopicExchange;
    @Value("${mq.queueBinding.dlRoutingKey}")
    private String dlRoutingKey;
    @Value("${mq.queueBinding.dlQueue}")
    private String dlQueue;
    
    // Create a dead-letter switch
    @Bean
    public TopicExchange dlTopicExchange(a){
        return new TopicExchange(dlTopicExchange,true.false);
    }
    Create a dead letter queue
    @Bean
    public Queue dlQueue(a){
        return new Queue(dlQueue,true);
    }
    
    // The dead letter queue is bound to the dead letter switch
    @Bean
    public Binding BindingErrorQueueAndExchange(Queue dlQueue, TopicExchange dlTopicExchange){
        return BindingBuilder.bind(dlQueue).to(dlTopicExchange).with(dlRoutingKey);
    }

    private final String dle = "x-dead-letter-exchange";
    private final String dlk = "x-dead-letter-routing-key";
    private final String ttl = "x-message-ttl";

    // Create a service queue
    @Bean
    public Queue payQueue(a){
        Map<String,Object> params = new HashMap<>();
        // Set the queue expiration time to 10s
        params.put(ttl,10000);
        // Declare the dead-letter switch to which the current queue is bound
        params.put(dle,dlTopicExchange);
        // Declare the dead-letter routing key of the current queue
        params.put(dlk,dlRoutingKey);
        return QueueBuilder.durable(queueName).withArguments(params).build();
    }

    // Create service switches
    @Bean
    public TopicExchange payTopicExchange(a){
        return new TopicExchange(exchangeMame,true.false);
    }
    // The service queue is bound to the service switch
    @Bean
    public Binding BindingPayQueueAndPayTopicExchange(Queue payQueue, TopicExchange payTopicExchange){
        returnBindingBuilder.bind(payQueue).to(payTopicExchange).with(key); }}Copy the code

Creating a producer

/* * the producer */
@Component
@Slf4j
public class RabbitSender {

    @Value("${mq.queueBinding.exchange.name}")
    private String exchangeName;

    @Value("${mq.queueBinding.key}")
    private String key;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(String msg){
        log.info("RabbitSender.send() msg = {}",msg); rabbitTemplate.convertAndSend(exchangeName,key,msg); }}Copy the code

Create consumers

The consumer is a message in the consumer dead-letter queue

/** * consumer */
@Component
@Slf4j
public class RabbitReceiver {

    // Consume dead-letter queue messages
    @RabbitListener(queues = "${mq.queueBinding.dlQueue}")
    public void infoConsumption(String data) throws Exception {
        log.info("Received message :{}",data);
        log.info("Then a series of logical processes Thanks - (路 蠅 路) Blue"); }}Copy the code

External interface

@RestController
public class TestController {

    @Autowired
    private RabbitSender rabbitSender;
    @GetMapping
    public void test(@RequestParam String msg){ rabbitSender.send(msg); }}Copy the code

After starting the service, you can see the switches and queues created Then make the interface call:http://localhost:8080/?msg= hahahahahaha

2020-11-15 00:33:02.991Rabbitsender.send () MSG = hahaha hahaha2020-11-15 00:33:12.060Received message: hahaha hahaha2020-11-15 00:33:12.060Then a series of logical processes are performed Thanks (路 蠅 路) BlueCopy the code

We set the expiration time to 10s; The message expired and became dead letter. Forwarding from prod_queue_pay queue to DL-queue queue. It feels like it worked out perfectly.

One problem we will find is that if I now have different scenarios, such as delayed consumption after 5s, 10s, and 15s, I need to create three queues. Every time a request comes in for a different time period, I need to create a queue, which is definitely not going to work.

The RabbitMQ plug-in implements delay queuing

If you have a problem, solve it! Start by installing a plug-in.

Quick entry: juejin.cn/post/697714…

Create queues and switches

@Configuration
public class RabbitConfig2 {

    private static final String EXCHANGE_NAME = "delayed_exchange";
    private static final String QUEUE_NAME = "delayed_queue";
    private static final String ROUTE_KEY = "delayed_key";
    /** * switch */
    @Bean
    CustomExchange  exchange(a) {
        // Set fanout /direct/topic/header with the x-delayed-type parameter
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type"."topic");
        return new CustomExchange(EXCHANGE_NAME, "x-delayed-message".true.false,args);
    }

    /** * queue */
    @Bean
    public Queue queue(a) {
        return new Queue(QUEUE_NAME,true.false.false);
    }

    /** * bind queues to switches */
    @Bean
    public Binding binding(CustomExchange exchange,Queue queue) {
        returnBindingBuilder .bind(queue) .to(exchange) .with(ROUTE_KEY) .noargs(); }}Copy the code

Creating a producer

Add the x-delay parameter to the header to control the delay of sending messages.

/* * the producer */
@Component
@Slf4j
public class RabbitSender {
    private static final String ROUTE_KEY = "delayed_key";
    private static final String EXCHANGE_NAME = "delayed_exchange";
    / * * *@paramMSG message *@paramDelay Indicates the delay time, in seconds */
    public void send2(String msg,int delay){
        log.info("RabbitSender.send() msg = {}",msg);
        rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTE_KEY, msg, message ->{
            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);  // Message persistence
            message.getMessageProperties().setDelay(delay * 1000);   // The unit is milliseconds
            returnmessage; }); }}// setDelay method source implementation
   public void setDelay(Integer delay) {
        if(delay ! =null && delay >= 0) {
            this.headers.put("x-delay", delay);
        } else {
            this.headers.remove("x-delay"); }}Copy the code

Create consumers

/** * consumer */
@Component
@Slf4j
public class RabbitReceiver {

    @RabbitListener(queues = "delayed_queue")
    public void infoConsumption(String data) throws Exception {
        log.info("Received message :{}",data);
        log.info("Then a series of logical processes Thanks - (路 蠅 路) Blue"); }}Copy the code

Provide external methods

@RestController
public class TestController {

    @Autowired
    private RabbitSender rabbitSender;

    @GetMapping("/test2/{msg}/{delay}")
    public void test2(@PathVariable("msg") String msg, @PathVariable("delay")int delay){ rabbitSender.send2(msg,delay); }}Copy the code

Start the service and log in to the RabbitMQ management interface. You can see that the switch and queue have been created successfully. Then the messages with delay consumption time of 60s, 30s, and 5s are sent respectively. Check consumer spending records.

Request 1: http://localhost:8080/test2/msg= send time for 60 s expiration time / 60

Request 2: http://localhost:8080/test2/msg= send time for 30 s expiration time / 30

Request 3: http://localhost:8080/test2/msg= send time to expiration time of 5 s / 5

You can find that logs are sent in sequence: 60s, 30s, and 5s. However, the order of consumption is 5s, 30s, and 60s.

2020-11-15 16:13:58.783Rabbitsender. send() MSG = MSG = Expiration time of 60 seconds2020-11-15 16:14:02.653Rabbitsender. send() MSG = MSG = Expiration time when the sending time is 30 seconds2020-11-15 16:14:08.880Rabbitsender. send() MSG = MSG = Expiration time when the sending time is 5s2020-11-15 16:14:13.924Received message: MSG = Expiration time of 5s2020-11-15 16:14:13.925Then a series of logical processes are performed Thanks (路 蠅 路) Blue2020-11-15 16:14:32.685Received message: MSG = Expiration time of 30 seconds2020-11-15 16:14:32.687Then a series of logical processes are performed Thanks (路 蠅 路) Blue2020-11-15 16:14:58.814Received message: MSG = Expiration time of 60 seconds2020-11-15 16:14:58.814Then a series of logical processes are performed Thanks (路 蠅 路) BlueCopy the code
  • If you have any questions or errors in this article, please feel free to comment. If you find this article helpful, please like it and follow it.