The following will introduce a variety of ideas to implement delay queue, the end of the article provides several implementations of github address. In fact, there is no absolute good or bad way, it just depends on what business scenarios it is used in, there is no best technology only the most appropriate.

First, the application of delay queue

What is a delayed queue? As the name suggests, it first has the characteristics of a queue, and then it has the ability to delay the consumption of queue messages, which means it can specify at what point in time the messages in the queue will be consumed.

Delay queue is widely used in projects, especially in e-commerce platforms:

1. If the payment is not made within 30 minutes after the order is successful, the order will be cancelled automatically

2. The delivery platform will send a food order notice and push a short message to the user 60 seconds after the order is successfully placed.

3, if the order has been in an unfinished state, timely processing of customs orders, and return the inventory

4. New merchants on Taobao will be frozen if they fail to upload product information within one month

.

All of the above scenarios can be solved using delay queues.

Second, the implementation of delay queue

My personal view has always been that it is not easy to reinvent the wheel or introduce third-party middleware for functions that can be implemented with JDK apis. On the one hand, self-encapsulation is very easy to cause problems (except for the big guy), plus debugging verification generates a lot of unnecessary work; On the other hand, once the middleware of three parties is connected, the system complexity will increase exponentially and the maintenance cost will also increase greatly.

1. DelayQueue indicates the DelayQueue

A set of APIS for implementing delay queues is provided in the JDK under the java.util.concurrent package DelayQueue.

DelayQueue is a BlockingQueue that essentially encapsulates a PriorityQueue. PriorityQueue uses a complete binary heap to sort the elements of the queue. When we add an element to the DelayQueue, we give the element Delay as the sorting condition, and the smallest element in the queue takes precedence over the smallest element in the queue. Elements in the queue are not allowed to be fetched from the queue until the Delay time. The queue can store basic data types or custom entity classes. When storing basic data types, the elements in the priority queue are arranged in ascending order by default, and the custom entity classes need to be calculated according to the class attribute values.

Add three orders to the DelayQueue and set the order to be cancelled after 5 seconds, 10 seconds and 15 seconds of the current time respectively.

To implement DelayQueue, the element in the queue implements the Delayed interface, which has only one getDelay method that sets the delay time. The compareTo method in the Order class is responsible for sorting the elements in the queue.

Public class Order implements Delayed {/** * delay time */ @jsonformat (locale ="zh", timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
    private long time;
    String name;

    public Order(String name, long time, TimeUnit unit) {
        this.name = name;
        this.time = System.currentTimeMillis() + (time > 0 ? unit.toMillis(time) : 0);
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return time - System.currentTimeMillis();
    }
    @Override
    public int compareTo(Delayed o) {
        Order Order = (Order) o;
        long diff = this.time - Order.time;
        if (diff <= 0) {
            return- 1; }else {
            return1; }}}Copy the code

DelayQueue’s PUT method is thread-safe because it uses the ReentrantLock lock internally for thread synchronization. DelayQueue also provides two methods of queue exit poll() and take(). Poll () is a non-blocking fetch and null is returned if there are no expired elements. Take () blocks, and threads that have not expired elements will wait.

public class DelayQueueDemo {

    public static void main(String[] args) throws InterruptedException {
        Order Order1 = new Order("Order1", 5, TimeUnit.SECONDS);
        Order Order2 = new Order("Order2", 10, TimeUnit.SECONDS);
        Order Order3 = new Order("Order3", 15, TimeUnit.SECONDS);
        DelayQueue<Order> delayQueue = new DelayQueue<>();
        delayQueue.put(Order1);
        delayQueue.put(Order2);
        delayQueue.put(Order3);

        System.out.println("Order delay queue start time :" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
        while(delayQueue.size() ! Poll () {Order task = delayqueue.poll ();if(task ! = null) { System.out.format("Order :{%s} was cancelled at {%s}\n", task.name, LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); } Thread.sleep(1000); }}}Copy the code

Above is just a simple implementation of the team and out of the team operation, the actual development will have a special thread, responsible for the message in the team and consumption.

Order1, Order2, Order3 will be executed after 5 seconds, Order2, Order3 will be executed after 10 seconds, Order3 will be executed after 15 seconds.

Order :{Order1} has been cancelled, {2020-05-06 14:59:14} Order :{Order2} has been cancelled, Order :{Order3} has been cancelled. Order :{2020-05-06 14:59:24}Copy the code
2. Quartz Timed tasks

In the days before Redis and RabbitMQ were widely used, timed out, unpaid and cancelled orders were implemented by scheduled tasks. Scheduled task has a certain periodicity. Many orders may have timed out but have not reached the time point to trigger execution, so the order processing is not timely enough.

Introduce the Quartz framework dependencies

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
Copy the code

Use the @enablesCheduling annotation in the startup class to enable the scheduled task function.

@EnableScheduling @SpringBootApplication public class DelayqueueApplication { public static void main(String[] args) { SpringApplication.run(DelayqueueApplication.class, args); }}Copy the code

Write a scheduled task that executes every 5 seconds.

@Component public class QuartzDemo {// Every five seconds @scheduled (cron ="0/5 * * * *? ")
    public void process(){
        System.out.println("I'm on a regular assignment!"); }}Copy the code
3, Redis sorted set

Redis data structure Zset can also achieve the effect of delayed queue, mainly using its score attribute, Redis uses score to sort the members of the set from small to large.

[image uploaded…(image-d9C98F-1588938283391-3)]

The zadd command is used to add elements to the delayQueue and set the score value to indicate the expiration time of elements. Add three order1, order2, and order3 to delayQueue that will expire after 10 seconds, 20 seconds, and 30 seconds, respectively.

 zadd delayqueue 3 order3
Copy the code

The consumer polls the delayQueue and compares the minimum time with the current time. For example, if the time is shorter than the current time, the key is expired.

/** * consume message */ public voidpollOrderQueue() {

        while (true) {
            Set<Tuple> set = jedis.zrangeWithScores(DELAY_QUEUE, 0, 0);

            String value = ((Tuple) set.toArray()[0]).getElement();
            int score = (int) ((Tuple) set.toArray()[0]).getScore();

            Calendar cal = Calendar.getInstance();
            int nowSecond = (int) (cal.getTimeInMillis() / 1000);
            if (nowSecond >= score) {
                jedis.zrem(DELAY_QUEUE, value);
                System.out.println(sdf.format(new Date()) + " removed key:" + value);
            }

            if (jedis.zcard(DELAY_QUEUE) <= 0) {
                System.out.println(sdf.format(new Date()) + " zset empty ");
                return; } Thread.sleep(1000); }}Copy the code

We are seeing implementation results that are in line with expectations

2020-05-07 13:24:09 add finished.
2020-05-07 13:24:19 removed key:order1
2020-05-07 13:24:29 removed key:order2
2020-05-07 13:24:39 removed key:order3
2020-05-07 13:24:39 zset empty 
Copy the code
4, Redis expired callback

If the key expires, a callback event will be triggered. If the key expires, a callback event will be triggered.

Modify the redis.conf file to enable notify-keyspace-events Ex

notify-keyspace-events Ex
Copy the code

Redis configuration monitoring, inject beans RedisMessageListenerContainer

@Configuration
public class RedisListenerConfig {
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {

        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        returncontainer; }}Copy the code

Write Redis expired callback to monitor method, must inherit KeyExpirationEventMessageListener, somewhat akin to MQ message to monitor.

@Component
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {

    public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
        super(listenerContainer);
    }
    @Override
    public void onMessage(Message message, byte[] pattern) {
        String expiredKey = message.toString();
        System.out.println("Listen for key:" + expiredKey + "Expired"); }}Copy the code

This code is written, very simple, next to test the effect, in the redis-CLI client add a key and given 3s expiration time.

 set xiaofu 123 ex 3
Copy the code

The expired key was successfully listened on the console.

The expired key monitored is xiaofuCopy the code
5. RabbitMQ delay queue

The most common way to use RabbitMQ for delay queuing is through the TTL and DXL properties of RabbitMQ message queues, which RabbitMQ does not support directly.

Let’s start with TTL and DXL:

Time To Live (TTL) :

TTL is exactly what its name implies: the lifetime of a message. RabbitMQ can set the lifetime of a message on a specified Queue and message using the x-message-tt parameter, which is a non-negative integer in microseconds.

RabbitMQ can set message expiration in two dimensions, the queue and the message itself

  • Set the queue expiration time so that all messages in the queue have the same expiration time.
  • Set the message expiration time. Set the expiration time for each message in the queueTTLCan be different.

If you set the TTL of the queue and messages in the queue at the same time, the smaller of the two values prevails. If the TTL expires, the message in the queue becomes a Dead Letter.

Dead Letter Exchanges (DLX)

A DLX is a dead letter switch and a dead letter queue bound to a dead letter switch. A RabbitMQ Queue can be configured with two parameters: x-dead-letter-exchange and x-dead-letter-routing-key (optional). Once a dead letter is sent to the Queue, Using these two parameters, messages can be rerouted to another Exchange (switch) so that they can be consumed again.

X-dead-letter-exchange: After a dead letter appears in the queue, the dead letter is re-routed to the specified exchange (switch).

X-dead-letter-routing-key: specifies the routing key, usually the queue to be forwarded.

Dead Letter in queue:

  • Of a message or queueTTLoverdue
  • The queue length reaches the maximum. Procedure
  • The message is basic.reject or basic.nack.

We send the order message A0001 to the delay queue order.delay.queue and set the survival time of x-message-TT message to 30 minutes. When the order message A0001 becomes a Dead Letter 30 minutes later, the delay queue detects that there is a Dead Letter. Configure x-dead-letter-exchange to forward the Dead Letter to the closed order queue that can consume normally, and directly listen to the closed order queue to process the logic of the closed order.

[img-buuiu39E-1588940997388] [upload_images.jianshu. IO /upload_imag…]

Specifies how long a message is delayed when sending a message

public void send(String delayTimes) {
        amqpTemplate.convertAndSend("order.pay.exchange"."order.pay.queue"."Hi, I'm delayed data."Message - > {/ / set delay millisecond value message. GetMessageProperties () setExpiration (String) the valueOf (delayTimes));returnmessage; }); }}Copy the code

Set the forwarding rule for dead letter in delay queue

/** * delay queue */ @bean (name ="order.delay.queue")
    public Queue getMessageQueue() {
        returnQueuebuilder.durable (rabbitconstant.dead_letter_queue) // Configure the forwarding exchange after expiration. WithArgument ("x-dead-letter-exchange"."order.close.exchange") // Configure the routing key to forward after expiration. WithArgument ("x-dead-letter-routing-key"."order.close.queue")
                .build();
    }
Copy the code
Time wheel

The implementation methods of the previous several delay queues are relatively simple and easy to understand, while the time wheel algorithm is a little abstract. Kafka, Netty have a time wheel algorithm based on the implementation of the delay queue, the following main practice of netty’s delay queue to talk about the time wheel is what principle.

Let’s start with a schematic of the time wheel and explain some basic concepts of the time wheel

The disk in the picture can be seen as the scale of a clock. For example, if the length of a round is 24 seconds and the number of scales is 8, then each scale represents 3 seconds. So the time accuracy is 3 seconds. The greater the time length/scale value, the greater the accuracy.

When A timed and delayed task A is added, if it will be executed after 25 seconds, but the length of one round of time cycle is only 24 seconds, then A round number and corresponding pointer position index will be obtained according to the length of time cycle and scale. In other words, task A will point to the 0 grid once around. In this case, the round and index information of the task is recorded in the time round. If round=0, index=0, and the pointer points to 0, grid task A will not execute because round=0 does not satisfy the requirement.

So each cell represents some time, such as 1 second and 25 seconds pointing to zero, and the tasks are placed in a linked list corresponding to each cell, similar to the data in a HashMap.

Netty mainly uses HashedWheelTimer to construct the delay queue. The underlying data structure of HashedWheelTimer still uses DelayedQueue, but it only adopts the algorithm of time wheel to realize it.

Below, we use Netty to simply realize the delay queue. There are many HashedWheelTimer constructors, and explain the meaning of each parameter.

  • ThreadFactory: indicates that it is used to generate worker threads, usually using a thread pool.
  • tickDurationandunit: Time interval of each cell, default 100ms;
  • ticksPerWheelIf the value passed in is not 2 to the N, it will be adjusted to a value greater than or equal to 2 to the N of this parameter, which is good for optimizationhashValue calculation.
public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel) {
        this(threadFactory, tickDuration, unit, ticksPerWheel, true);
    }
Copy the code
  • TimerTask: an implementation interface for a scheduled task, where the run method wraps the logic for the scheduled task.
  • Timeout: a scheduled task is submitted toTimerHandle to cancel the scheduled task and make basic judgments about the status of the scheduled task.
  • TimerIs:HashedWheelTimerThe implementation’s parent interface only defines how to submit a scheduled task and how to stop the entire timing mechanism.
public class NettyDelayQueue { public static void main(String[] args) { final Timer timer = new HashedWheelTimer(Executors.defaultThreadFactory(), 5, TimeUnit.SECONDS, 2); // TimerTask task1 = newTimerTask() {
            public void run(Timeout timeout) throws Exception {
                System.out.println("Execute after order1 5s"); timer.newTimeout(this, 5, TimeUnit.SECONDS); // Register again at the end}}; timer.newTimeout(task1, 5, TimeUnit.SECONDS); TimerTask task2 = newTimerTask() {
            public void run(Timeout timeout) throws Exception {
                System.out.println("Execute after order2 10s"); timer.newTimeout(this, 10, TimeUnit.SECONDS); // Re-register at the end}}; timer.newTimeout(task2, 10, TimeUnit.SECONDS); // Delay the task timer.newTimeout(newTimerTask() {
            public void run(Timeout timeout) throws Exception {
                System.out.println("Execute once after order3 15s"); } }, 15, TimeUnit.SECONDS); }}Copy the code

According to the execution result, the delayed tasks of order3 and order3 are executed only once, while order2 and order1 are timed tasks that are executed repeatedly in different cycles.

Order1 5s; order2 10s; order3 15s; order1 5s; Order2 10sCopy the code

conclusion

In order to make you easier to understand, written in the above code is simple and crude, demo has several methods of implementation are submitted to making address: https://github.com/chengxy-nds/delayqueue, interested friends can download run a run.

May write not perfect place, such as where there is wrong or unknown, welcome everyone to correct!!

The last

Imperceptibly he has done a few years of development, by the time I remember just came out of the work to feel that they can cow force, now recall the feeling of good ignorance. The more you know, the less you know.

For programmers, there are too many knowledge content and technology to learn.

Many people will always encounter some problems when they just contact this industry or when they meet the bottleneck period. For example, after learning for a period of time, they feel no sense of direction and do not know where to start to learn. They can follow me, update various technical dry goods every day, and share more hottest things in the programmer circle.