This is the 16th day of my participation in the August More Text Challenge. For details, see: August More Text Challenge
Related articles
RabbitMQ series: RabbitMQ series
Create a SpringBoot project. For details, see the two ways to create a SpringBoot project
The directory structure is as follows
Code architecture diagram
Create two queues QA and QB with the TTL set to 10S and 40S respectively.
Then create a switch X and a dead-letter switch Y, both of type Direct.
Create a dead letter queue QD.
1. Delay queue
Pom is introduced into the jar
<! <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId></groupId> <artifactId>fastjson</artifactId> <version>1.247.</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <! --swagger--> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>2.92.</version> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger-ui</artifactId> <version>2.92.</version> </dependency> <! <groupId> <artifactId>spring-rabbit-test</artifactId> </artifactId> <scope>test</scope> </dependency>Copy the code
Change the suffix application.proprties to yml
Spring: rabbitmq: host: IP address port:5672 username: admin password: 111111 Copy the code
Add the Swagger Config class
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import; import springfox.documentation.service.ApiInfo; import springfox.documentation.service.Contact; import springfox.documentation.spi.DocumentationType; import springfox.documentation.spring.web.plugins.Docket; import springfox.documentation.swagger2.annotations.EnableSwagger2; /** * Swagger class *@author DingYongJun * @date2021/8/6 * / @Configuration @EnableSwagger2 public class SwaggerConfig { @Bean public Docket webApiConfig(a) { return new Docket(DocumentationType.SWAGGER_2) .groupName("webApi") .apiInfo(webApiInfo()) .select() .build(); } private ApiInfo webApiInfo(a) { return new ApiInfoBuilder() .title("Rabbitmq Interface Documentation") .description("This document describes the RabbitMQ microservice interface definition.") .version("1.0") .contact(new Contact("dayu".""."[email protected]")) .build(); }}Copy the code
MQ config
import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * MQ configuration class *@author DingYongJun * @date2021/8/6 * / @Configuration public class TtlQueueConfig { public static final String X_EXCHANGE = "X"; public static final String QUEUE_A = "QA"; public static final String QUEUE_B = "QB"; public static final String Y_DEAD_LETTER_EXCHANGE = "Y"; public static final String DEAD_LETTER_QUEUE = "QD"; / / declare xExchange @Bean("xExchange") public DirectExchange xExchange(a) { return new DirectExchange(X_EXCHANGE); } / / declare xExchange @Bean("yExchange") public DirectExchange yExchange(a) { return new DirectExchange(Y_DEAD_LETTER_EXCHANGE); } Declare queue A with A TTL of 10s and bind it to the corresponding dead-letter switch @Bean("queueA") public Queue queueA(a) { Map<String, Object> args = new HashMap<>(3); // Declare the dead-letter switch bound to the current queue args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); // Declare the dead-letter routing key for the current queue args.put("x-dead-letter-routing-key"."YD"); // Declare the TTL of the queue args.put("x-message-ttl".10000); return QueueBuilder.durable(QUEUE_A).withArguments(args).build(); } // Bind queue A to switch X @Bean public Binding queueaBindingX(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange) { return BindingBuilder.bind(queueA).to(xExchange).with("XA"); } // Declare queue B with TTL 40s and bind to the corresponding dead-letter switch @Bean("queueB") public Queue queueB(a) { Map<String, Object> args = new HashMap<>(3); // Declare the dead-letter switch bound to the current queue args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); // Declare the dead-letter routing key for the current queue args.put("x-dead-letter-routing-key"."YD"); // Declare the TTL of the queue args.put("x-message-ttl".40000); return QueueBuilder.durable(QUEUE_B).withArguments(args).build(); } Declare queue B bound to switch X @Bean public Binding queuebBindingX(@Qualifier("queueB") Queue queue1B, @Qualifier("xExchange") DirectExchange xExchange) { return BindingBuilder.bind(queue1B).to(xExchange).with("XB"); } // Declare the dead letter queue QD @Bean("queueD") public Queue queueD(a) { return new Queue(DEAD_LETTER_QUEUE); } // Declare the QD binding of the dead letter queue @Bean public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD, @Qualifier("yExchange") DirectExchange yExchange) { return BindingBuilder.bind(queueD).to(yExchange).with("YD"); }}Copy the code
Controller simulates the producer
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.Date; /** * controller analog producer *@author DingYongJun * @date2021/8/6 * / @Slf4j @RequestMapping("ttl") @RestController public class SendMsgController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("sendMsg/{message}") public void sendMsg(@PathVariable String message) {"Current time :{}, send a message to both TTL queues :{}".new Date(), message); rabbitTemplate.convertAndSend("X"."XA"."Message from queue with TTL of 10S:" + message); rabbitTemplate.convertAndSend("X"."XB"."Message from queue with TTL 40S:"+ message); }}Copy the code
import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import; import java.util.Date; /** * Consumer *@author DingYongJun * @date2021/8/6 * / @Slf4j @Component public class DeadLetterQueueConsumer { @RabbitListener(queues = "QD") public void receiveD(Message message, Channel channel) throws IOException { String msg = new String(message.getBody());"Current time: {}, received dead-letter queue message {}".newDate().toString(), msg); }}Copy the code
So, is it easier with SpringBoot integrated?
Ready, let’s launch the project and type it in the browser
http:/ / localhost: 8080 / TTL/sendMsg/hello! The big fish Copy the code
The execution result
- Calculate the time. The first interval is 10 seconds.
- The second is 40s.
- Perfect for our design expectations!
- The first message becomes a dead-letter message after 10S and is consumed by the consumer, and the second message becomes a dead-letter message after 40S and is consumed, thus creating a delayed queue.
Optimize the delay queue
With the above design, what if we now need to add a message with a 50 second delay?
Are we going to add another MQ config? That’s not smart!
If the demand is to book a conference room, wouldn’t there be an infinite number of queues?
So we need to optimize the above structure!
Code architecture diagram
The new MQ config
import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; /** * New MQ configuration class *@author DingYongJun * @date2021/8/6 * / @Component public class MsgTtlQueueConfig { public static final String Y_DEAD_LETTER_EXCHANGE = "Y"; public static final String QUEUE_C = "QC"; // Declare queue C dead letter switch @Bean("queueC") public Queue queueB(a) { Map<String, Object> args = new HashMap<>(3); // Declare the dead-letter switch bound to the current queue args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); // Declare the dead-letter routing key for the current queue args.put("x-dead-letter-routing-key"."YD"); // No TTL attribute is declared return QueueBuilder.durable(QUEUE_C).withArguments(args).build(); } Declare queue B bound to switch X @Bean public Binding queuecBindingX(@Qualifier("queueC") Queue queueC, @Qualifier("xExchange") DirectExchange xExchange) { return BindingBuilder.bind(queueC).to(xExchange).with("XC"); }}Copy the code
@RequestMapping(value = "sendExpirationMsg", method = RequestMethod.GET) public void sendMsg(@RequestParam Map<String,Object> parmsMap) { String message = parmsMap.get("message").toString(); String ttlTime = parmsMap.get("ttlTime").toString(); rabbitTemplate.convertAndSend("X"."XC", message, correlationData ->{ correlationData.getMessageProperties().setExpiration(ttlTime); return correlationData; });"Current time :{}, send a TTL of {} milliseconds to queue C:{}".new Date(),ttlTime, message); } Copy the code
In this case, we use Map to accept parameters. After all, Map is very useful
Browser input
http://localhost:8080/ttl/sendExpirationMsg? Message = Hello Mr. Big Fish &ttlTime=50000 http://localhost:8080/ttl/sendExpirationMsg? Message = Hello Mr. Big Fish &ttlTime=5000 Copy the code
The execution result
- Perfect to meet the requirements! Expiration time custom! Now that’s intelligence!
The MQ plug-in implements the delay queue
That doesn’t seem like a problem, does it?
But know that MQ will only check if the first message is out of date if the first message is too late
Will cause our second message not to be executed first
Because the queue is first in, first out. Is that all right?
Seeing is believing
It is obvious that
- What we want is a second 5-second delay for the first consumption, followed by a 50-second delay for the message.
- In effect, MQ waits until the first one is executed before executing the second one.
- That’s why it’s first-in, first-out.
The MQ plugin solves this problem for us
First download rabbitmq_delayed_message_Exchange
Must look at the version oh ~ will report version error otherwise.
And then uploaded to the server in the/usr/lib/rabbitmq/lib/rabbitmq_server – 3.7.18 / plugins directory
Run rabbitmq-plugins enable rabbitmq_delayed_message_exchange to install rabbitmq-plugins
Restart MQ systemctl restart rabbitmq-server
Take a look at our MQ backend page
The switch type for x-delayed-message appeared. Congratulations on enabling the plug-in!
MQ config
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.CustomExchange; import org.springframework.amqp.core.Queue; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class DelayedQueueConfig { public static final String DELAYED_QUEUE_NAME = "delayed.queue"; public static final String DELAYED_EXCHANGE_NAME = ""; public static final String DELAYED_ROUTING_KEY = "delayed.routingkey"; @Bean public Queue delayedQueue(a) { return new Queue(DELAYED_QUEUE_NAME); } // Custom switch What we have defined here is a delayed switch @Bean public CustomExchange delayedExchange(a) { Map<String, Object> args = new HashMap<>(); // Customize the switch type args.put("x-delayed-type"."direct"); return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message".true.false, args); } @Bean public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue, @Qualifier("delayedExchange") CustomExchange delayedExchange) { returnBindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs(); }}Copy the code
public static final String DELAYED_EXCHANGE_NAME = ""; public static final String DELAYED_ROUTING_KEY = "delayed.routingkey"; @GetMapping("sendDelayMsg/{message}/{delayTime}") public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) { rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message, correlationData -> { correlationData.getMessageProperties().setDelay(delayTime); return correlationData; }); :{} send a message with a delay of {} milliseconds to the delayed queue. Queue :{}".new Date(), delayTime, message); } Copy the code
http://localhost:8080/ttl/sendDelayMsg/come on baby1/20000 http://localhost:8080/ttl/sendDelayMsg/come on baby2/2000 Copy the code
The execution result
I see no ending, but I will search high and low
If you think I blogger writes good! Writing is not easy, please like, follow, comment to encourage the blogger ~hahah