This is the 16th day of my participation in the August More Text Challenge. For details, see: August More Text Challenge
Related articles
RabbitMQ series: RabbitMQ series
preface
-
Create a SpringBoot project. For details, see the two ways to create a SpringBoot project
-
The directory structure is as follows
-
Code architecture diagram
-
Create two queues QA and QB with the TTL set to 10S and 40S respectively.
-
Then create a switch X and a dead-letter switch Y, both of type Direct.
-
Create a dead letter queue QD.
1. Delay queue
-
Pom is introduced into the jar
-
<! <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.247.</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <! --swagger--> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>2.92.</version> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger-ui</artifactId> <version>2.92.</version> </dependency> <! <groupId> <artifactId>spring-rabbit-test</artifactId> </artifactId> <scope>test</scope> </dependency>Copy the code
-
-
Change the suffix application.proprties to yml
-
Spring: rabbitmq: host: IP address port:5672 username: admin password: 111111 Copy the code
-
-
Add the Swagger Config class
-
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import springfox.documentation.builders.ApiInfoBuilder; import springfox.documentation.service.ApiInfo; import springfox.documentation.service.Contact; import springfox.documentation.spi.DocumentationType; import springfox.documentation.spring.web.plugins.Docket; import springfox.documentation.swagger2.annotations.EnableSwagger2; /** * Swagger class *@author DingYongJun * @date2021/8/6 * / @Configuration @EnableSwagger2 public class SwaggerConfig { @Bean public Docket webApiConfig(a) { return new Docket(DocumentationType.SWAGGER_2) .groupName("webApi") .apiInfo(webApiInfo()) .select() .build(); } private ApiInfo webApiInfo(a) { return new ApiInfoBuilder() .title("Rabbitmq Interface Documentation") .description("This document describes the RabbitMQ microservice interface definition.") .version("1.0") .contact(new Contact("dayu"."https://juejin.cn/user/2084329779387864/posts"."[email protected]")) .build(); }}Copy the code
-
-
MQ config
-
import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * MQ configuration class *@author DingYongJun * @date2021/8/6 * / @Configuration public class TtlQueueConfig { public static final String X_EXCHANGE = "X"; public static final String QUEUE_A = "QA"; public static final String QUEUE_B = "QB"; public static final String Y_DEAD_LETTER_EXCHANGE = "Y"; public static final String DEAD_LETTER_QUEUE = "QD"; / / declare xExchange @Bean("xExchange") public DirectExchange xExchange(a) { return new DirectExchange(X_EXCHANGE); } / / declare xExchange @Bean("yExchange") public DirectExchange yExchange(a) { return new DirectExchange(Y_DEAD_LETTER_EXCHANGE); } Declare queue A with A TTL of 10s and bind it to the corresponding dead-letter switch @Bean("queueA") public Queue queueA(a) { Map<String, Object> args = new HashMap<>(3); // Declare the dead-letter switch bound to the current queue args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); // Declare the dead-letter routing key for the current queue args.put("x-dead-letter-routing-key"."YD"); // Declare the TTL of the queue args.put("x-message-ttl".10000); return QueueBuilder.durable(QUEUE_A).withArguments(args).build(); } // Bind queue A to switch X @Bean public Binding queueaBindingX(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange) { return BindingBuilder.bind(queueA).to(xExchange).with("XA"); } // Declare queue B with TTL 40s and bind to the corresponding dead-letter switch @Bean("queueB") public Queue queueB(a) { Map<String, Object> args = new HashMap<>(3); // Declare the dead-letter switch bound to the current queue args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); // Declare the dead-letter routing key for the current queue args.put("x-dead-letter-routing-key"."YD"); // Declare the TTL of the queue args.put("x-message-ttl".40000); return QueueBuilder.durable(QUEUE_B).withArguments(args).build(); } Declare queue B bound to switch X @Bean public Binding queuebBindingX(@Qualifier("queueB") Queue queue1B, @Qualifier("xExchange") DirectExchange xExchange) { return BindingBuilder.bind(queue1B).to(xExchange).with("XB"); } // Declare the dead letter queue QD @Bean("queueD") public Queue queueD(a) { return new Queue(DEAD_LETTER_QUEUE); } // Declare the QD binding of the dead letter queue @Bean public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD, @Qualifier("yExchange") DirectExchange yExchange) { return BindingBuilder.bind(queueD).to(yExchange).with("YD"); }}Copy the code
-
-
Controller simulates the producer
-
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.Date; /** * controller analog producer *@author DingYongJun * @date2021/8/6 * / @Slf4j @RequestMapping("ttl") @RestController public class SendMsgController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("sendMsg/{message}") public void sendMsg(@PathVariable String message) { log.info("Current time :{}, send a message to both TTL queues :{}".new Date(), message); rabbitTemplate.convertAndSend("X"."XA"."Message from queue with TTL of 10S:" + message); rabbitTemplate.convertAndSend("X"."XB"."Message from queue with TTL 40S:"+ message); }}Copy the code
-
-
consumers
-
import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.Date; /** * Consumer *@author DingYongJun * @date2021/8/6 * / @Slf4j @Component public class DeadLetterQueueConsumer { @RabbitListener(queues = "QD") public void receiveD(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); log.info("Current time: {}, received dead-letter queue message {}".newDate().toString(), msg); }}Copy the code
-
-
So, is it easier with SpringBoot integrated?
-
Ready, let’s launch the project and type it in the browser
-
http:/ / localhost: 8080 / TTL/sendMsg/hello! The big fish Copy the code
-
-
The execution result
- Calculate the time. The first interval is 10 seconds.
- The second is 40s.
- Perfect for our design expectations!
- The first message becomes a dead-letter message after 10S and is consumed by the consumer, and the second message becomes a dead-letter message after 40S and is consumed, thus creating a delayed queue.
Optimize the delay queue
-
With the above design, what if we now need to add a message with a 50 second delay?
-
Are we going to add another MQ config? That’s not smart!
-
If the demand is to book a conference room, wouldn’t there be an infinite number of queues?
-
So we need to optimize the above structure!
-
Code architecture diagram
-
The new MQ config
-
import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; /** * New MQ configuration class *@author DingYongJun * @date2021/8/6 * / @Component public class MsgTtlQueueConfig { public static final String Y_DEAD_LETTER_EXCHANGE = "Y"; public static final String QUEUE_C = "QC"; // Declare queue C dead letter switch @Bean("queueC") public Queue queueB(a) { Map<String, Object> args = new HashMap<>(3); // Declare the dead-letter switch bound to the current queue args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); // Declare the dead-letter routing key for the current queue args.put("x-dead-letter-routing-key"."YD"); // No TTL attribute is declared return QueueBuilder.durable(QUEUE_C).withArguments(args).build(); } Declare queue B bound to switch X @Bean public Binding queuecBindingX(@Qualifier("queueC") Queue queueC, @Qualifier("xExchange") DirectExchange xExchange) { return BindingBuilder.bind(queueC).to(xExchange).with("XC"); }}Copy the code
-
-
controller
-
@RequestMapping(value = "sendExpirationMsg", method = RequestMethod.GET) public void sendMsg(@RequestParam Map<String,Object> parmsMap) { String message = parmsMap.get("message").toString(); String ttlTime = parmsMap.get("ttlTime").toString(); rabbitTemplate.convertAndSend("X"."XC", message, correlationData ->{ correlationData.getMessageProperties().setExpiration(ttlTime); return correlationData; }); log.info("Current time :{}, send a TTL of {} milliseconds to queue C:{}".new Date(),ttlTime, message); } Copy the code
-
In this case, we use Map to accept parameters. After all, Map is very useful
-
-
Browser input
-
http://localhost:8080/ttl/sendExpirationMsg? Message = Hello Mr. Big Fish &ttlTime=50000 http://localhost:8080/ttl/sendExpirationMsg? Message = Hello Mr. Big Fish &ttlTime=5000 Copy the code
-
-
The execution result
- Perfect to meet the requirements! Expiration time custom! Now that’s intelligence!
The MQ plug-in implements the delay queue
-
That doesn’t seem like a problem, does it?
-
But know that MQ will only check if the first message is out of date if the first message is too late
-
Will cause our second message not to be executed first
-
Because the queue is first in, first out. Is that all right?
-
Seeing is believing
-
It is obvious that
- What we want is a second 5-second delay for the first consumption, followed by a 50-second delay for the message.
- In effect, MQ waits until the first one is executed before executing the second one.
- That’s why it’s first-in, first-out.
-
The MQ plugin solves this problem for us
-
First download rabbitmq_delayed_message_Exchange
-
Must look at the version oh ~ will report version error otherwise.
-
And then uploaded to the server in the/usr/lib/rabbitmq/lib/rabbitmq_server – 3.7.18 / plugins directory
-
Run rabbitmq-plugins enable rabbitmq_delayed_message_exchange to install rabbitmq-plugins
-
Restart MQ systemctl restart rabbitmq-server
-
Take a look at our MQ backend page
-
The switch type for x-delayed-message appeared. Congratulations on enabling the plug-in!
-
MQ config
-
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.CustomExchange; import org.springframework.amqp.core.Queue; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class DelayedQueueConfig { public static final String DELAYED_QUEUE_NAME = "delayed.queue"; public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange"; public static final String DELAYED_ROUTING_KEY = "delayed.routingkey"; @Bean public Queue delayedQueue(a) { return new Queue(DELAYED_QUEUE_NAME); } // Custom switch What we have defined here is a delayed switch @Bean public CustomExchange delayedExchange(a) { Map<String, Object> args = new HashMap<>(); // Customize the switch type args.put("x-delayed-type"."direct"); return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message".true.false, args); } @Bean public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue, @Qualifier("delayedExchange") CustomExchange delayedExchange) { returnBindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs(); }}Copy the code
-
-
controller
-
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange"; public static final String DELAYED_ROUTING_KEY = "delayed.routingkey"; @GetMapping("sendDelayMsg/{message}/{delayTime}") public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) { rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message, correlationData -> { correlationData.getMessageProperties().setDelay(delayTime); return correlationData; }); log.info(Queue :{} send a message with a delay of {} milliseconds to the delayed queue. Queue :{}".new Date(), delayTime, message); } Copy the code
-
-
perform
-
http://localhost:8080/ttl/sendDelayMsg/come on baby1/20000 http://localhost:8080/ttl/sendDelayMsg/come on baby2/2000 Copy the code
-
-
The execution result
-
Solution!!
I see no ending, but I will search high and low
If you think I blogger writes good! Writing is not easy, please like, follow, comment to encourage the blogger ~hahah