This is the 25th day of my participation in Gwen Challenge
Accumulate over a long period, constant dripping wears away a stone 馃槃
What is a delay queue
The delay queue stores the corresponding delayed message. The so-called “delayed message” means that after the message is sent, consumers do not want to get the message immediately, but wait for a specified time before they get the message for consumption.
How does RabbitMQ implement delay queuing
The AMQP protocol and RabbitMQ themselves do not support delay queuing directly, but it can be simulated using DLX and TTL.
Implementing delay queuing
You need to create two queues, one for sending messages and one for forwarding the message after the message expires.
The producer sends a message to Queue1 with TTL set, such as 60s. The message waits 60 seconds in Queue1, and if no consumer consumes it, it is forwarded to Queue2, which has bound consumers and handles delayed messages.
Note :Queue1 has no consumers
Join the rely on
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.0</version>
</dependency>
Copy the code
configure
spring:
application:
name: ttl-queue
rabbitmq:
host: 47.105198.54.
port: 5672
virtual-host: /test-1
username: 11
password: 111
mq:
queueBinding:
queue: prod_queue_pay
dlQueue: dl-queue
exchange:
name: exchang_prod_pay
dlTopicExchange: dl-topic-exchange
type: topic
key: prod_pay
dlRoutingKey: dl-routing-key
Copy the code
Create a service queue and a dead letter queue
@Configuration
public class RabbitConfig {
// Service queue configuration
@Value("${mq.queueBinding.queue}")
private String queueName;
@Value("${mq.queueBinding.exchange.name}")
private String exchangeMame;
@Value("${mq.queueBinding.key}")
private String key;
// Dead letter queue configuration
@Value("${mq.queueBinding.exchange.dlTopicExchange}")
private String dlTopicExchange;
@Value("${mq.queueBinding.dlRoutingKey}")
private String dlRoutingKey;
@Value("${mq.queueBinding.dlQueue}")
private String dlQueue;
// Create a dead-letter switch
@Bean
public TopicExchange dlTopicExchange(a){
return new TopicExchange(dlTopicExchange,true.false);
}
Create a dead letter queue
@Bean
public Queue dlQueue(a){
return new Queue(dlQueue,true);
}
// The dead letter queue is bound to the dead letter switch
@Bean
public Binding BindingErrorQueueAndExchange(Queue dlQueue, TopicExchange dlTopicExchange){
return BindingBuilder.bind(dlQueue).to(dlTopicExchange).with(dlRoutingKey);
}
private final String dle = "x-dead-letter-exchange";
private final String dlk = "x-dead-letter-routing-key";
private final String ttl = "x-message-ttl";
// Create a service queue
@Bean
public Queue payQueue(a){
Map<String,Object> params = new HashMap<>();
// Set the queue expiration time to 10s
params.put(ttl,10000);
// Declare the dead-letter switch to which the current queue is bound
params.put(dle,dlTopicExchange);
// Declare the dead-letter routing key of the current queue
params.put(dlk,dlRoutingKey);
return QueueBuilder.durable(queueName).withArguments(params).build();
}
// Create service switches
@Bean
public TopicExchange payTopicExchange(a){
return new TopicExchange(exchangeMame,true.false);
}
// The service queue is bound to the service switch
@Bean
public Binding BindingPayQueueAndPayTopicExchange(Queue payQueue, TopicExchange payTopicExchange){
returnBindingBuilder.bind(payQueue).to(payTopicExchange).with(key); }}Copy the code
Creating a producer
/* * the producer */
@Component
@Slf4j
public class RabbitSender {
@Value("${mq.queueBinding.exchange.name}")
private String exchangeName;
@Value("${mq.queueBinding.key}")
private String key;
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String msg){
log.info("RabbitSender.send() msg = {}",msg); rabbitTemplate.convertAndSend(exchangeName,key,msg); }}Copy the code
Create consumers
The consumer is a message in the consumer dead-letter queue
/** * consumer */
@Component
@Slf4j
public class RabbitReceiver {
// Consume dead-letter queue messages
@RabbitListener(queues = "${mq.queueBinding.dlQueue}")
public void infoConsumption(String data) throws Exception {
log.info("Received message :{}",data);
log.info("Then a series of logical processes Thanks - (路 蠅 路) Blue"); }}Copy the code
External interface
@RestController
public class TestController {
@Autowired
private RabbitSender rabbitSender;
@GetMapping
public void test(@RequestParam String msg){ rabbitSender.send(msg); }}Copy the code
After starting the service, you can see the switches and queues created Then make the interface call:http://localhost:8080/?msg= hahahahahaha
2020-11-15 00:33:02.991Rabbitsender.send () MSG = hahaha hahaha2020-11-15 00:33:12.060Received message: hahaha hahaha2020-11-15 00:33:12.060Then a series of logical processes are performed Thanks (路 蠅 路) BlueCopy the code
We set the expiration time to 10s; The message expired and became dead letter. Forwarding from prod_queue_pay queue to DL-queue queue. It feels like it worked out perfectly.
One problem we will find is that if I now have different scenarios, such as delayed consumption after 5s, 10s, and 15s, I need to create three queues. Every time a request comes in for a different time period, I need to create a queue, which is definitely not going to work.
The RabbitMQ plug-in implements delay queuing
If you have a problem, solve it! Start by installing a plug-in.
Quick entry: juejin.cn/post/697714…
Create queues and switches
@Configuration
public class RabbitConfig2 {
private static final String EXCHANGE_NAME = "delayed_exchange";
private static final String QUEUE_NAME = "delayed_queue";
private static final String ROUTE_KEY = "delayed_key";
/** * switch */
@Bean
CustomExchange exchange(a) {
// Set fanout /direct/topic/header with the x-delayed-type parameter
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type"."topic");
return new CustomExchange(EXCHANGE_NAME, "x-delayed-message".true.false,args);
}
/** * queue */
@Bean
public Queue queue(a) {
return new Queue(QUEUE_NAME,true.false.false);
}
/** * bind queues to switches */
@Bean
public Binding binding(CustomExchange exchange,Queue queue) {
returnBindingBuilder .bind(queue) .to(exchange) .with(ROUTE_KEY) .noargs(); }}Copy the code
Creating a producer
Add the x-delay parameter to the header to control the delay of sending messages.
/* * the producer */
@Component
@Slf4j
public class RabbitSender {
private static final String ROUTE_KEY = "delayed_key";
private static final String EXCHANGE_NAME = "delayed_exchange";
/ * * *@paramMSG message *@paramDelay Indicates the delay time, in seconds */
public void send2(String msg,int delay){
log.info("RabbitSender.send() msg = {}",msg);
rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTE_KEY, msg, message ->{
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); // Message persistence
message.getMessageProperties().setDelay(delay * 1000); // The unit is milliseconds
returnmessage; }); }}// setDelay method source implementation
public void setDelay(Integer delay) {
if(delay ! =null && delay >= 0) {
this.headers.put("x-delay", delay);
} else {
this.headers.remove("x-delay"); }}Copy the code
Create consumers
/** * consumer */
@Component
@Slf4j
public class RabbitReceiver {
@RabbitListener(queues = "delayed_queue")
public void infoConsumption(String data) throws Exception {
log.info("Received message :{}",data);
log.info("Then a series of logical processes Thanks - (路 蠅 路) Blue"); }}Copy the code
Provide external methods
@RestController
public class TestController {
@Autowired
private RabbitSender rabbitSender;
@GetMapping("/test2/{msg}/{delay}")
public void test2(@PathVariable("msg") String msg, @PathVariable("delay")int delay){ rabbitSender.send2(msg,delay); }}Copy the code
Start the service and log in to the RabbitMQ management interface. You can see that the switch and queue have been created successfully. Then the messages with delay consumption time of 60s, 30s, and 5s are sent respectively. Check consumer spending records.
Request 1: http://localhost:8080/test2/msg= send time for 60 s expiration time / 60
Request 2: http://localhost:8080/test2/msg= send time for 30 s expiration time / 30
Request 3: http://localhost:8080/test2/msg= send time to expiration time of 5 s / 5
You can find that logs are sent in sequence: 60s, 30s, and 5s. However, the order of consumption is 5s, 30s, and 60s.
2020-11-15 16:13:58.783Rabbitsender. send() MSG = MSG = Expiration time of 60 seconds2020-11-15 16:14:02.653Rabbitsender. send() MSG = MSG = Expiration time when the sending time is 30 seconds2020-11-15 16:14:08.880Rabbitsender. send() MSG = MSG = Expiration time when the sending time is 5s2020-11-15 16:14:13.924Received message: MSG = Expiration time of 5s2020-11-15 16:14:13.925Then a series of logical processes are performed Thanks (路 蠅 路) Blue2020-11-15 16:14:32.685Received message: MSG = Expiration time of 30 seconds2020-11-15 16:14:32.687Then a series of logical processes are performed Thanks (路 蠅 路) Blue2020-11-15 16:14:58.814Received message: MSG = Expiration time of 60 seconds2020-11-15 16:14:58.814Then a series of logical processes are performed Thanks (路 蠅 路) BlueCopy the code
- If you have any questions or errors in this article, please feel free to comment. If you find this article helpful, please like it and follow it.