In development, it is common to encounter requirements for delayed tasks. For example,

30 minutes to generate an order without payment, it is automatically cancelled to generate an order 60 seconds later, send a message to the user for the above task, we give a professional name to describe, that is the delay task. So this raises the question, what is the difference between a delayed task and a timed task? There are several differences

A scheduled task has a specific triggering time. A delayed task does not have a scheduled task and has an execution period. A delayed task is executed within a period of time after an event is triggered

Next, we take judging whether the order times out as an example to conduct scheme analysis

Project analysis

(1) Database polling

Train of thought

This scheme is usually used in small projects, where a thread periodically scans the database, determines whether there are timed orders by order time, and then performs operations such as UPDATE or DELETE

implementation

The blogger used Quartz in his early years (during his internship), so a quick introduction

The Maven project introduces a dependency as shown below

< the dependency > < groupId > org. Quartz - the scheduler < / groupId > < artifactId > quartz < / artifactId > < version > 2.2.2 < / version > </dependency>Copy the code

Call the Demo class MyJob as shown below

package com.rjzheng.delay1; import org.quartz.Job; import org.quartz.JobBuilder; import org.quartz.JobDetail; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.quartz.Scheduler; import org.quartz.SchedulerException; import org.quartz.SchedulerFactory; import org.quartz.SimpleScheduleBuilder; import org.quartz.Trigger; import org.quartz.TriggerBuilder; import org.quartz.impl.StdSchedulerFactory; public class MyJob implements Job { public void execute(JobExecutionContext context) throws JobExecutionException { System.out.println(" I want to scan database... ") ); } public static void main(String[] args) throws Exception {// Create a task JobDetail JobDetail = jobBuilder.newJob (myjob.class)  .withIdentity("job1", "group1").build(); / / create a trigger Once every 3 seconds to execute the Trigger the Trigger. = TriggerBuilder newTrigger () withIdentity (" trigger1 ", "group3") .withSchedule(SimpleScheduleBuilder.simpleSchedule() .withIntervalInSeconds(3) .repeatForever()) .build(); Scheduler scheduler = new StdSchedulerFactory().getScheduler(); // Place tasks and their triggers into the scheduler scheduler.scheduleJob(jobDetail, trigger); // The scheduler starts scheduling tasks. Start (); }}Copy the code

Run the code and you can see that every 3 seconds, the output is as follows

It’s time to run a database scan…

The advantages and disadvantages

Advantages:

Supports cluster operations

Disadvantages:

There’s a lot of latency on the server memory, so if you scan every 3 minutes, the worst latency is 3 minutes and if you have tens of millions of orders, and you scan every few minutes, the database consumes a lot of data

(2)JDK delay queue

Train of thought

This scheme is implemented using the JDK’s DelayQueue, which is an unbounded blocking queue. The queue can only get elements from it when the delay expires. The object in the DelayQueue must implement the Delayed interface.

The DelayedQueue implementation workflow is shown below

Among them

Poll(): retrieves and removes the timeout element from the queue, returns null take(): retrieves and removes the timeout element from the queue, waits for the current thread until any element meets the timeout condition, and returns the result.

implementation

Define a class OrderDelay to implement Delayed as follows

package com.rjzheng.delay2; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; public class OrderDelay implements Delayed { private String orderId; private long timeout; OrderDelay(String orderId, long timeout) { this.orderId = orderId; this.timeout = timeout + System.nanoTime(); } public int compareTo(Delayed other) { if (other == this) { return 0; } OrderDelay t = (OrderDelay) other; long d = (getDelay(TimeUnit.NANOSECONDS) - t.getDelay(TimeUnit.NANOSECONDS)); return (d == 0) ? 0 : ((d < 0) ? (1) : 1); Public long getDelay(TimeUnit unit) {return unit.convert(timeout-system.nanotime (), TimeUnit.NANOSECONDS); } void print() {system.out.println (orderId + "orderId"); ); }}Copy the code

Running the test Demo, we set the delay to 3 seconds

package com.rjzheng.delay2; import java.util.ArrayList; import java.util.List; import java.util.concurrent.DelayQueue; import java.util.concurrent.TimeUnit; public class DelayQueueDemo { public static void main(String[] args) { // TODO Auto-generated method stub List < String > list = new ArrayList < String > (); list.add("00000001"); list.add("00000002"); list.add("00000003"); list.add("00000004"); list.add("00000005"); DelayQueue < OrderDelay > queue = newDelayQueue < OrderDelay > (); long start = System.currentTimeMillis(); for(int i = 0; i < 5; I++) {/ / delay three SECONDS out queue. The put (new OrderDelay (list. Get (I), TimeUnit NANOSECONDS. Convert (3, TimeUnit. SECONDS))); try { queue.take().print(); System.out.println("After " + (System.currentTimeMillis() - start) + " MilliSeconds"); } catch(InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); }}}}Copy the code

The output is as follows

Order 00000001 will be deleted... After 3003 MilliSeconds 00000002 After 6006 MilliSeconds 00000003 After 9006 MilliSeconds 00000004 After 12008 MilliSeconds 00000005 After 15009 MilliSecondsCopy the code

You can see that the delay is 3 seconds and the order is deleted

The advantages and disadvantages

Advantages:

High efficiency, low task trigger time delay.

Disadvantages:

After the server restarts, all the data disappeared, afraid of downtime cluster expansion is quite troublesome because of memory constraints, such as the number of orders not paid too much, then it is easy to appear OOM abnormal code complexity is high

(3) Time round algorithm

Train of thought

Let’s start with a picture of a time wheel (it’s everywhere)

The time wheel algorithm can be analogous to a clock. For example, in the figure above, the arrow (pointer) rotates at a fixed frequency in a certain direction, and each beat is called a tick. It can be seen that the timing wheel consists of three important attribute parameters, ticksPerWheel (the number of ticks in a cycle), tickDuration (the duration of a tick), and timeUnit (timeUnit). For example, when ticksPerWheel=60, TickDuration =1, timeUnit= second, which is exactly like the constant second hand movement in real life.

If the current pointer is above 1 and I have a task that needs to be executed in 4 seconds, the thread callback or message for that execution will be placed on 5. What if you need to execute it after 20 seconds? Because the number of slots in the loop is only up to 8, if it takes 20 seconds, the pointer needs to make two more turns. Position is above 5 after 2 turns (20% 8 + 1)

implementation

We use Netty’s HashedWheelTimer to do this

Add the following dependencies to the Pom

Ty < dependency > < groupId > io.net < / groupId > < artifactId > netty -all < / artifactId > < version > 4.1.24. The Final < / version > </dependency>Copy the code

The test code HashedWheelTimerTest is shown below

package com.rjzheng.delay3; import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; import io.netty.util.Timer; import io.netty.util.TimerTask; import java.util.concurrent.TimeUnit; public class HashedWheelTimerTest { static class MyTimerTask implements TimerTask { boolean flag; public MyTimerTask(boolean flag) { this.flag = flag; } public void run(Timeout Timeout) throws Exception {// TODO auto-generated method stub system.out. println(" The order is deleted from the database. ..." ); this.flag = false; } } public static void main(String[] argv) { MyTimerTask timerTask = new MyTimerTask(true); Timer timer = new HashedWheelTimer(); timer.newTimeout(timerTask, 5, TimeUnit.SECONDS); int i = 1; while(timerTask.flag) { try { Thread.sleep(1000); } catch(InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } system.out. println(I + "seconds passed "); i++; }}}Copy the code

The output is as follows

1 second has passed 2 seconds have passed 3 seconds have passed 4 seconds have passed 5 seconds have gone to the database to delete the order... Six seconds passedCopy the code

The advantages and disadvantages

Advantages:

The delay time of task triggering is lower than that of delayQueue, and the code complexity is lower than that of delayQueue. Disadvantages:

After the server restarts, all the data disappears. It is very troublesome to avoid downtime. Cluster expansion is very troublesome because of memory constraints, such as placing an order with too many unpaid orders, then it is easy to appear OOM exception

(4) the redis cache

Train of thought

Zset of Redis is used,zset is an ordered set, each element (member) is associated with a score, and the value in the set can be obtained by score sorting

ZADD key score member [[score member] [score member]… ZRANGE key start stop [WITHSCORES] ZREM key member ZREM key member [member…]

Test the following

Google.com (integer) 1 # add multiple elements redis> ZADD page_rank 9 baidu.com 8 bing.com (integer)  2 redis> ZRANGE page_rank 0 -1 WITHSCORES 1) "bing.com" 2) "8" 3) "baidu.com" 4) "9" 5) "google.com" 6) "10" # Redis > ZREM page_rank google.com (integer) 1 redis> ZRANGE page_rank 0 -1 WITHSCORES 1) "bing.com" 2) "8" 3) "baidu.com" 4) "9"Copy the code

So how do you do that? We set the order timeout timestamp and order number to score and member respectively, and the system scans the first element to determine whether it has timeout, as shown in the figure below

Implement a

package com.rjzheng.delay4; import java.util.Calendar; import java.util.Set; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.Tuple; Public class AppTest {private static final String ADDR = "127.0.0.1"; private static final int PORT = 6379; private static JedisPool jedisPool = new JedisPool(ADDR, PORT); public static Jedis getJedis() { return jedisPool.getResource(); Public void productionDelayMessage() {for(int I = 0; i < 5; Calendar cal1 = calendar.getInstance (); Calendar cal1 = calendar.getInstance (); cal1.add(Calendar.SECOND, 3); int second3later = (int)(cal1.getTimeInMillis() / 1000); AppTest.getJedis().zadd("OrderId", second3later, "OID0000001" + i); System.out.println(system.currentTimemillis () + "ms:redis generates an order task: order ID is "+ "OID0000001" + I); Public void consumerDelayMessage() {Jedis Jedis = apptest.getJedis (); while(true) { Set < Tuple > items = jedis.zrangeWithScores("OrderId", 0, 1); If (the items = = null | | items. The isEmpty ()) {System. Out. Println (" currently there is no waiting for the task of "); try { Thread.sleep(500); } catch(InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } continue; } int score = (int)((Tuple) items.toArray()[0]).getScore(); Calendar cal = Calendar.getInstance(); int nowSecond = (int)(cal.getTimeInMillis() / 1000); if(nowSecond >= score) { String orderId = ((Tuple) items.toArray()[0]).getElement(); jedis.zrem("OrderId", orderId); System.out.println(system.currentTimemillis () + "ms:redis consumes a task: consume the order OrderId as "+ OrderId); } } } public static void main(String[] args) { AppTest appTest = new AppTest(); appTest.productionDelayMessage(); appTest.consumerDelayMessage(); }}Copy the code

The corresponding output is as follows

As you can see, it’s almost always 3 seconds after the consumption order.

However, there is a fatal flaw in this version. Under high concurrency conditions, multiple consumers will fetch the same order number, and we will test the code ThreadTest. In addition, search the public account Java architect technical background reply “Spring” to obtain a surprise gift package.

package com.rjzheng.delay4; import java.util.concurrent.CountDownLatch; public class ThreadTest { private static final int threadNum = 10; private static CountDownLatch cdl = newCountDownLatch(threadNum); static class DelayMessage implements Runnable { public void run() { try { cdl.await(); } catch(InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } AppTest appTest = new AppTest(); appTest.consumerDelayMessage(); } } public static void main(String[] args) { AppTest appTest = new AppTest(); appTest.productionDelayMessage(); for(int i = 0; i < threadNum; i++) { new Thread(new DelayMessage()).start(); cdl.countDown(); }}}Copy the code

The output is shown below

Obviously, multiple threads consume the same resource.

The solution

(1) With distributed lock, but with distributed lock, the performance is reduced, the scheme will not be detailed.

(2) Judge the return value of ZREM, and consume data only when it is greater than 0, so the value in the consumerDelayMessage() method is

if(nowSecond >= score) { String orderId = ((Tuple) items.toArray()[0]).getElement(); jedis.zrem("OrderId", orderId); System.out.println(system.currentTimemillis () + "ms:redis consumes a task: consume the order OrderId as "+ OrderId); }Copy the code

Modified to

if(nowSecond >= score) { String orderId = ((Tuple) items.toArray()[0]).getElement(); Long num = jedis.zrem("OrderId", orderId); if(num ! = null && num > 0) {system.out.println (system.currentTimemillis () + "ms: OrderId = "+ OrderId); }}Copy the code

After this modification, re-run the ThreadTest class and find that the output is fine

Idea 2

The solution uses Redis’s Keyspace Notifications mechanism, which provides a callback when a key fails, essentially sending a message to the client. Redis version 2.8 or above is required.

Realize the

In redis.conf, add a configuration

The notify-keyspace-events Ex running code is as follows

package com.rjzheng.delay5; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPubSub; Public class RedisTest {private static final String ADDR = "127.0.0.1"; private static final int PORT = 6379; private static JedisPool jedis = new JedisPool(ADDR, PORT); private static RedisSub sub = new RedisSub(); public static void init() { new Thread(new Runnable() { public void run() { jedis.getResource().subscribe(sub, "__keyevent@0__:expired"); } }).start(); } public static void main(String[] args) throws InterruptedException { init(); for(int i = 0; i < 10; i++) { String orderId = "OID000000" + i; jedis.getResource().setex(orderId, 3, orderId); System.out.println(system.currentTimemillis () + "ms:" + orderId + "order generation "); } } static class RedisSub extends JedisPubSub { < ahref = 'http://www.jobbole.com/members/wx610506454' > @Override < /a>  public void onMessage(String channel, String message) {system.out.println (system.currentTimemillis () + "ms:" + message + "order cancelled "); }}}Copy the code

The output is as follows

You can clearly see that three seconds later, the order is cancelled

Ps: Redis pub/sub has a bug

The original:

Because Redis Pub/Sub is fire and forget currently there is no way to use this feature if your application demands reliable notification of events, that is, if your Pub/Sub client disconnects, and reconnects later, all the events delivered during the time the client was disconnected are lost.
Copy the code

Turn:

Redis’s publish/subscribe model is currently fire and forget, making it impossible to reliably notify events. That is, if the publish/subscribe client disconnects and then reconnects, all events during the client disconnection are lost.

Therefore, option two is not recommended. Of course, if you don’t have high reliability requirements, you can use it.

The advantages and disadvantages

Advantages:

Since Redis is used as a message channel, messages are stored in Redis. If the sender or task handler dies, it is possible to reprocess the data after the restart. Doing cluster expansion is quite convenient with high time accuracy

Disadvantages:

Additional Redis maintenance is required

(5) Use message queues

We can use rabbitMQ’s delayed queue. RabbitMQ has the following two features to implement delayed queuing

RabbitMQ can set x-message-tt for Queue and Message to control the lifetime of messages. If a timeout occurs, the Message will become a dead letter

The Queue of lRabbitMQ can be configured with x-dead-letter-exchange and x-dead-letter-routing-key (optional) to control the occurrence of deadletter in the Queue, and re-route according to these two parameters.

By combining these two features, you can simulate the function of delayed messages. More specifically, I’ll write another article, but it’s too long to cover here.

The advantages and disadvantages

Advantages:

Efficient, easily scale-out using rabbitMQ’s distributed nature, and message persistence for increased reliability.

Disadvantages:

Its ease of use depends on the operation and maintenance of rabbitMq. The complexity and cost of referencing rabbitMq increases