The introduction

In development, it is common to encounter requirements for delayed tasks. For example,

  • If the order is not paid within 30 minutes, it will be cancelled automatically
  • 60 seconds after the order is generated, text message is sent to the user

We have a technical name for these tasks: delayed tasks. So this raises the question, what is the difference between a delayed task and a timed task? There are several differences

  1. Scheduled tasks have a specific trigger time, while delayed tasks do not
  2. A scheduled task has an execution period, while a delayed task is executed within a period of time after an event is triggered
  3. Scheduled tasks generally perform batch operations of multiple tasks, while delayed tasks generally perform single tasks

Next, we take judging whether the order times out as an example to conduct scheme analysis

Project analysis

(1) Database polling

Train of thought

This scheme is usually used in small projects, where a thread periodically scans the database, determines whether there are timed orders by order time, and then performs operations such as UPDATE or DELETE

implementation

The blogger used Quartz earlier in the year (during my internship). A brief introduction to the Maven project introduces a dependency as shown below

< the dependency > < groupId > org. Quartz - the scheduler < / groupId > < artifactId > quartz < / artifactId > < version > 2.2.2 < / version > </dependency>Copy the code

Call the Demo class MyJob as shown below

package com.rjzheng.delay1;

import org.quartz.JobBuilder;
import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.SchedulerFactory;
import org.quartz.SimpleScheduleBuilder;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.impl.StdSchedulerFactory;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;

public class MyJob implements Job {
    public void execute(JobExecutionContext context)
            throws JobExecutionException {
        System.out.println("Gotta run a database scan..."); } public static void main(String[] args) throws Exception {// Create a task JobDetail JobDetail = jobBuilder.newJob (myjob.class)  .withIdentity("job1"."group1").build(); // Create Trigger every 3 seconds Trigger Trigger = triggerBuilder.newtrigger ().withidentity ("trigger1"."group3") .withSchedule( SimpleScheduleBuilder.simpleSchedule() .withIntervalInSeconds(3).repeatForever()) .build(); Scheduler scheduler = new StdSchedulerFactory().getScheduler(); // Place tasks and their triggers into the scheduler scheduler.scheduleJob(jobDetail, trigger); // The scheduler starts scheduling tasks. Start (); }}Copy the code

Run the code and you can see that every 3 seconds, the output is as follows

It's time to run a database scan...Copy the code

The advantages and disadvantages

Advantages: Simple and supports cluster operations

Disadvantages:

(1) Large server memory consumption

(2) There is a delay. For example, if you scan every 3 minutes, the worst delay is 3 minutes

(3) Suppose you have tens of millions of orders, every few minutes such a scan, the loss of the database is huge

(2)JDK delay queue

Train of thought

This scheme is implemented using the JDK’s DelayQueue, which is an unbounded blocking queue. The queue can only get elements from it when the delay expires. The object in the DelayQueue must implement the Delayed interface. The DelayedQueue implementation workflow is shown below

Poll(): retrieves and removes the timeout element from the queue, or returns null if there is none

Take (): retrieves and removes the timeout element from the queue. If none exists, wait for the current thread until any element meets the timeout condition.

implementation

Define a class OrderDelay to implement Delayed as follows

package com.rjzheng.delay2;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class OrderDelay implements Delayed {

    private String orderId;
    private long timeout;

    OrderDelay(String orderId, long timeout) {
        this.orderId = orderId;
        this.timeout = timeout + System.nanoTime();
    }

    public int compareTo(Delayed other) {
        if (other == this)
            return 0;
        OrderDelay t = (OrderDelay) other;
        long d = (getDelay(TimeUnit.NANOSECONDS) - t
                .getDelay(TimeUnit.NANOSECONDS));
        return(d == 0) ? 0 : ((d < 0) ? 1:1); } public long getDelay(TimeUnit unit) {public long getDelay(TimeUnit unit) {return unit.convert(timeout - System.nanoTime(), TimeUnit.NANOSECONDS);
    }

    void print() {
        System.out.println(orderId+"Order number will be deleted..."); }}Copy the code

Running the test Demo, we set the delay to 3 seconds

package com.rjzheng.delay2;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;

public class DelayQueueDemo {
     public static void main(String[] args) {  
            // TODO Auto-generated method stub  
            List<String> list = new ArrayList<String>();  
            list.add("00000001");  
            list.add("00000002");  
            list.add("00000003");  
            list.add("00000004");  
            list.add("00000005");  
            DelayQueue<OrderDelay> queue = new DelayQueue<OrderDelay>();  
            long start = System.currentTimeMillis();  
            for(int i = 0; i<5; I++) {/ / delay three SECONDS out queue. The put (new OrderDelay (list. Get (I), TimeUnit NANOSECONDS. Convert (3, TimeUnit. SECONDS))); try { queue.take().print(); System.out.println("After " +   
                                 (System.currentTimeMillis()-start) + " MilliSeconds"); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); }}}}Copy the code

The output is as follows

Order 00000001 will be deleted... After 3003 MilliSeconds 00000002 After 6006 MilliSeconds 00000003 After 9006 MilliSeconds 00000004 After 12008 MilliSeconds 00000005 After 15009 MilliSecondsCopy the code

You can see that the delay is 3 seconds and the order is deleted

The advantages and disadvantages

Advantages: High efficiency, low task trigger time delay. Disadvantages :(1) after the server restarts, all the data disappear, afraid of downtime; (2) cluster expansion is quite troublesome; (3) due to memory constraints, such as too many unpaid orders, it is easy to appear OOM exception; (4) high code complexity

(3) Time round algorithm

Train of thought

Let’s start with a picture of a time wheel (it’s everywhere)

The time wheel algorithm can be analogous to a clock. For example, in the figure above, the arrow (pointer) rotates at a fixed frequency in a certain direction, and each beat is called a tick. It can be seen that the timing wheel consists of three important attribute parameters, ticksPerWheel (the number of ticks in a cycle), tickDuration (the duration of a tick), and timeUnit (timeUnit). For example, when ticksPerWheel=60, TickDuration =1, timeUnit= second, which is exactly like the constant second hand movement in real life.

If the current pointer is above 1 and I have a task that needs to be executed in 4 seconds, the thread callback or message for that execution will be placed on 5. What if you need to execute it after 20 seconds? Because the number of slots in the loop is only up to 8, if it takes 20 seconds, the pointer needs to make two more turns. Position is above 5 after 2 turns (20% 8 + 1)

implementation

We use Netty’s HashedWheelTimer to add the following dependencies to Pom

Ty < dependency > < groupId > io.net < / groupId > < artifactId > netty -all < / artifactId > < version > 4.1.24. The Final < / version > </dependency>Copy the code

The test code HashedWheelTimerTest is shown below

package com.rjzheng.delay3;

import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;

import java.util.concurrent.TimeUnit;

public class HashedWheelTimerTest {
    static class MyTimerTask implements TimerTask{
        boolean flag;
        public MyTimerTask(boolean flag){
            this.flag = flag;
        }
        public void run(Timeout timeout) throws Exception {
            // TODO Auto-generated method stub
             System.out.println("I'm going to delete the order from the database...");
             this.flag =false;
        }
    }
    public static void main(String[] argv) {
        MyTimerTask timerTask = new MyTimerTask(true);
        Timer timer = new HashedWheelTimer();
        timer.newTimeout(timerTask, 5, TimeUnit.SECONDS);
        int i = 1;
        while(timerTask.flag){
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            System.out.println(i+"The second passed."); i++; }}}Copy the code

The output is as follows

1 second has passed 2 seconds have passed 3 seconds have passed 4 seconds have passed 5 seconds have gone to the database to delete the order... Six seconds passedCopy the code

The advantages and disadvantages

Advantages: High efficiency, lower delay time of task triggering and lower code complexity than delayQueue.

Disadvantages:

(1) After the server restarts, all the data disappears, so as to avoid downtime

(2) Cluster expansion is quite troublesome

(3) Due to limited memory conditions, such as too many unpaid orders, it is easy to appear OOM exception

(4) the redis cache

Thinking a

Using Redis’ zset, which is an ordered set, Zset add element :ZADD key score member [[score member] [score member]… ZREM key member [member…] ZREM key member [member…] ZREM key member Test the following

Add a single element

redis> ZADD page_rank 10 google.com
(integer1)# Add multiple elements

redis> ZADD page_rank 9 baidu.com 8 bing.com
(integer) 2

redis> ZRANGE page_rank 0 -1 WITHSCORES
1) "bing.com"
2) "8"
3) "baidu.com"
4) "9"
5) "google.com"
6) "10"

Select score from element
redis> ZSCORE page_rank bing.com
"8"

Remove a single element

redis> ZREM page_rank google.com
(integer) 1

redis> ZRANGE page_rank 0 -1 WITHSCORES
1) "bing.com"
2) "8"
3) "baidu.com"
4) "9"
Copy the code

So how do you do that? We set the order timeout timestamp and order number to score and member respectively, and the system scans the first element to determine whether it has timeout, as shown in the figure below

Implement a

package com.rjzheng.delay4;

import java.util.Calendar;
import java.util.Set;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Tuple;

public class AppTest {
    private static final String ADDR = "127.0.0.1";
    private static final int PORT = 6379;
    private static JedisPool jedisPool = new JedisPool(ADDR, PORT);

    public static Jedis getJedis() {
       returnjedisPool.getResource(); } // Generate 5 orders and put them in public voidproductionDelayMessage() {for(int i=0; i<5; Calendar cal1 = calendar.getInstance (); Calendar cal1 = calendar.getInstance (); cal1.add(Calendar.SECOND, 3); int second3later = (int) (cal1.getTimeInMillis() / 1000); AppTest.getJedis().zadd("OrderId", second3later,"OID0000001"+i);
            System.out.println(System.currentTimeMillis()+"Ms: Redis generated an order task: order ID is"+"OID0000001"+i); }} // get the order public voidconsumerDelayMessage(){
        Jedis jedis = AppTest.getJedis();
        while(true){
            Set<Tuple> items = jedis.zrangeWithScores("OrderId", 0, 1);
            if(items == null || items.isEmpty()){
                System.out.println("Currently no waiting task");
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                continue;
            }
            int  score = (int) ((Tuple)items.toArray()[0]).getScore();
            Calendar cal = Calendar.getInstance();
            int nowSecond = (int) (cal.getTimeInMillis() / 1000);
            if(nowSecond >= score){
                String orderId = ((Tuple)items.toArray()[0]).getElement();
                jedis.zrem("OrderId", orderId);
                System.out.println(System.currentTimeMillis() +"Ms: Redis consumes a task: consumes an order OrderId as"+orderId); } } } public static void main(String[] args) { AppTest appTest =new AppTest(); appTest.productionDelayMessage(); appTest.consumerDelayMessage(); }}Copy the code

The corresponding output is as follows

1525086085261ms: Redis generates an order task: OID00000010 1525086085263ms Order ID: OID00000011 1525086085266ms:redis generates an order task: OID00000012 1525086085268ms: Redis generates an order task: Order ID OID00000013 1525086085270ms: Redis generates an order task: order ID OID00000014 1525086088000MS: Redis consumes a task: Order OrderId: OID00000010 1525086088001ms: Redis has consumed a task: OrderId: OID00000012 1525086088003ms: Redis consumes a task: OID00000013 1525086088004ms: Redis consumes a task: Consumed order OrderId is OID00000014 Currently not waiting task Currently not waiting task Currently not waiting taskCopy the code

As you can see, it’s almost always 3 seconds after the consumption order.

However, there is a fatal flaw in this version. Under high concurrency conditions, multiple consumers will fetch the same order number, and we will test the code ThreadTest

package com.rjzheng.delay4;

import java.util.concurrent.CountDownLatch;

public class ThreadTest {
    private static final int threadNum = 10;
    private static CountDownLatch cdl = new CountDownLatch(threadNum);
    static class DelayMessage implements Runnable{
        public void run() {
            try {
                cdl.await();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            AppTest appTest =new AppTest();
            appTest.consumerDelayMessage();
        }
    }
    public static void main(String[] args) {
        AppTest appTest =new AppTest();
        appTest.productionDelayMessage();
        for(int i=0;i<threadNum;i++){
            new Thread(new DelayMessage()).start();
            cdl.countDown();
        }
    }
}

Copy the code

The output is shown below

1525087157727ms: Redis generates an order task: OID00000010 1525087157734ms: Redis generates an order task: Order ID: OID00000011 1525087157738ms:redis generates an order task: OID00000012 1525087157747ms: Redis generates an order task: OID00000012 1525087157747ms: Redis generates an order task: OID00000011 1525087157738ms Order ID OID00000013 1525087157753ms: Redis generates an order task: order ID OID00000014 1525087160009ms: Redis consumes a task: Order OrderId: OID00000010 1525087160011MS: Redis has consumed a task: Order OrderId: OID00000010 1525087160022MS: Redis has consumed a task: Order OrderId: OID00000011 1525087160029MS: Redis has consumed a task: Order OrderId: OID00000012 1525087160045MS: Redis has consumed a task: OrderId: OID00000012 1525087160053MS: Redis consumes a task: OID00000013 1525087160064MS: Redis consumes a task: OrderId: OID00000013 1525087160065ms: Redis consumes a task: Consumed order OrderId is OID00000014 Currently not waiting task Currently not waiting task Currently not waiting task Currently not waiting task Currently not waiting task Currently not waiting task Currently not waiting taskCopy the code

Obviously, multiple threads consume the same resource.

The solution

(1) With distributed lock, but with distributed lock, the performance is reduced, the scheme will not be detailed. (2) Judge the return value of ZREM, and consume data only when it is greater than 0, so the value in the consumerDelayMessage() method is

if(nowSecond >= score){
    String orderId = ((Tuple)items.toArray()[0]).getElement();
    jedis.zrem("OrderId", orderId);
    System.out.println(System.currentTimeMillis()+"Ms: Redis consumes a task: consumes an order OrderId as"+orderId);
}
Copy the code

Modified to

if(nowSecond >= score){
    String orderId = ((Tuple)items.toArray()[0]).getElement();
    Long num = jedis.zrem("OrderId", orderId);
    if( num ! = null && num>0){ System.out.println(System.currentTimeMillis()+"Ms: Redis consumes a task: consumes an order OrderId as"+orderId); }}Copy the code

After this modification, re-run the ThreadTest class and find that the output is fine

Idea 2

The solution uses Redis’s Keyspace Notifications mechanism, which provides a callback when a key fails, essentially sending a message to the client. Redis version 2.8 or above is required.

Realize the

In redis.conf, add a configuration

notify-keyspace-events Ex
Copy the code

Run the following code

package com.rjzheng.delay5;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPubSub;

public class RedisTest {
    private static final String ADDR = "127.0.0.1";
    private static final int PORT = 6379;
    private static JedisPool jedis = new JedisPool(ADDR, PORT);
    private static RedisSub sub = new RedisSub();

    public static void init() {
        new Thread(new Runnable() {
            public void run() {
                jedis.getResource().subscribe(sub, "__keyevent@0__:expired");
            }
        }).start();
    }

    public static void main(String[] args) throws InterruptedException {
        init();
        for(int i =0; i<10; i++){ String orderId ="OID000000"+i;
            jedis.getResource().setex(orderId, 3, orderId);
            System.out.println(System.currentTimeMillis()+"ms:"+orderId+"Order generation");
        }
    }

    static class RedisSub extends JedisPubSub {
        @Override
        public void onMessage(String channel, String message) {
            System.out.println(System.currentTimeMillis()+"ms:"+message+"Order Cancelled"); }}}Copy the code

The output is as follows

1525096202813 ms: OID0000000 orders generated 1525096202818 ms: OID0000001 orders generated 1525096202824 ms: OID0000002 order form 1525096202826 ms: OID0000003 orders generated 1525096202830 ms: OID0000004 orders generated 1525096202834 ms: OID0000005 order form 1525096202839 ms: OID0000006 orders generated 1525096205819 ms: OID0000000 order cancellation 1525096205920 ms: OID0000005 order cancellation 1525096205920 ms: OID0000004 order cancellation 1525096205920 ms: OID0000001 order cancellation 1525096205920 ms: OID0000003 order cancellation Ms: 1525096205920 OID0000006 order cancellation 1525096205920 ms: OID0000002 order cancellationCopy the code

Ps: Redis pub/sub mechanism has a hard crack. The official website is as follows

Because Redis Pub/Sub is fire and forget currently there is no way to use this feature if your application demands reliable notification of events, that is, if your Pub/Sub client disconnects, and reconnects later, all the events delivered during the time the client was disconnected are lost.

Redis’s publish/subscribe model is currently fire and forget, making it impossible to reliably notify events. That is, if the publish/subscribe client disconnects and then reconnects, all events during the client disconnection are lost. Therefore, option two is not recommended. Of course, if you don’t have high reliability requirements, you can use it.

The advantages and disadvantages

Advantages:

(1) Since Redis is used as the message channel, messages are stored in Redis. If the sender or task handler dies, it is possible to reprocess the data after the restart.

(2) It is quite convenient to do cluster extension

(3) High time accuracy

Cons: Additional Redis maintenance is required

(5) Use message queues

We can use rabbitMQ’s delayed queue. RabbitMQ has the following two features to implement delayed queuing

  • RabbitMQ can set x-message-tt for Queue and Message to control the lifetime of messages. If a timeout occurs, the Message will become a dead letter
  • The Queue of lRabbitMQ can be configured with x-dead-letter-exchange and x-dead-letter-routing-key (optional) to control the occurrence of deadletter in the Queue, and re-route according to these two parameters. By combining these two features, you can simulate the function of delayed messages. More specifically, I’ll write another article, but it’s too long to cover here.

The advantages and disadvantages

Advantages: high efficiency, easily scale-out using rabbitMQ’s distributed nature, and message persistence for increased reliability.

Disadvantages: Its ease of use depends on the operation and maintenance of rabbitMq. The complexity and cost of referencing rabbitMq increases