From: CSDN, author: HJM4702192

Link: blog.csdn.net/hjm4702192/…

In development, it is common to encounter requirements for delayed tasks. For example,

  • If the order is not paid within 30 minutes, it will be cancelled automatically
  • 60 seconds after the order is generated, text message is sent to the user

We have a technical name for these tasks: delayed tasks. So this raises the question, what is the difference between a delayed task and a timed task? There are several differences

Scheduled tasks have a specific trigger time, while delayed tasks do not

A scheduled task has an execution period, while a delayed task is executed within a period of time after an event is triggered

Scheduled tasks generally perform batch operations of multiple tasks, while delayed tasks generally perform single tasks

Next, we take judging whether the order times out as an example to conduct scheme analysis

Project analysis

(1) Database polling

Train of thought

This scheme is usually used in small projects, where a thread periodically scans the database, determines whether there are timed orders by order time, and then performs operations such as UPDATE or DELETE

implementation

The blogger used Quartz in his early years (during his internship), so a quick introduction

The Maven project introduces a dependency as shown below

< the dependency > < groupId > org. Quartz - the scheduler < / groupId > < artifactId > quartz < / artifactId > < version > 2.2.2 < / version > </dependency>Copy the code

Call the Demo class MyJob as shown below

package com.rjzheng.delay1; import org.quartz.JobBuilder; import org.quartz.JobDetail; import org.quartz.Scheduler; import org.quartz.SchedulerException; import org.quartz.SchedulerFactory; import org.quartz.SimpleScheduleBuilder; import org.quartz.Trigger; import org.quartz.TriggerBuilder; import org.quartz.impl.StdSchedulerFactory; import org.quartz.Job; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; public class MyJob implements Job { public void execute(JobExecutionContext context) throws JobExecutionException { System.out.println(" I want to scan database... ") ); } public static void main(String[] args) throws Exception {// Create a task JobDetail JobDetail = jobBuilder.newJob (myjob.class)  .withIdentity("job1", "group1").build(); // Create Trigger every 3 seconds Trigger Trigger = triggerBuilder.newtrigger ().withidentity ("trigger1", "group3") .withSchedule( SimpleScheduleBuilder.simpleSchedule() .withIntervalInSeconds(3).repeatForever()) .build(); Scheduler scheduler = new StdSchedulerFactory().getScheduler(); // Place tasks and their triggers into the scheduler scheduler.scheduleJob(jobDetail, trigger); // The scheduler starts scheduling tasks. Start (); }}Copy the code

Run the code and you can see that every 3 seconds, the output is as follows

It’s time to run a database scan…

The advantages and disadvantages

Advantages: Simple and supports cluster operations

Disadvantages :(1) large server memory consumption

(2) There is a delay. For example, if you scan every 3 minutes, the worst delay is 3 minutes

(3) Suppose you have tens of millions of orders, every few minutes such a scan, the loss of the database is huge

(2)JDK delay queue

Train of thought

This scheme is implemented using the JDK’s DelayQueue, which is an unbounded blocking queue. The queue can only get elements from it when the delay expires. The object in the DelayQueue must implement the Delayed interface.

The DelayedQueue implementation workflow is shown below

The picture

Poll(): retrieves and removes the timeout element from the queue, or returns null if there is none

Take (): retrieves and removes the timeout element from the queue. If none exists, wait for the current thread until any element meets the timeout condition.

implementation

Define a class OrderDelay to implement Delayed as follows

package com.rjzheng.delay2; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; public class OrderDelay implements Delayed { private String orderId; private long timeout; OrderDelay(String orderId, long timeout) { this.orderId = orderId; this.timeout = timeout + System.nanoTime(); } public int compareTo(Delayed other) { if (other == this) return 0; OrderDelay t = (OrderDelay) other; long d = (getDelay(TimeUnit.NANOSECONDS) - t .getDelay(TimeUnit.NANOSECONDS)); return (d == 0) ? 0 : ((d < 0) ? 1:1); Public long getDelay(TimeUnit Unit) {return unit.convert(timeout -) {return unit.convert(timeout -) System.nanoTime(),TimeUnit.NANOSECONDS); } void print() {system.out.println (orderId+" orderId "); ); }}Copy the code

Running the test Demo, we set the delay to 3 seconds

package com.rjzheng.delay2; import java.util.ArrayList; import java.util.List; import java.util.concurrent.DelayQueue; import java.util.concurrent.TimeUnit; public class DelayQueueDemo { public static void main(String[] args) { // TODO Auto-generated method stub List<String> list = new ArrayList<String>(); list.add("00000001"); list.add("00000002"); list.add("00000003"); list.add("00000004"); list.add("00000005"); DelayQueue<OrderDelay> queue = newDelayQueue<OrderDelay>(); long start = System.currentTimeMillis(); for(int i = 0; i<5; I++) {/ / delay three SECONDS out queue. The put (new OrderDelay (list. Get (I), TimeUnit NANOSECONDS. Convert (3, TimeUnit. SECONDS))); try { queue.take().print(); System.out.println("After " + (System.currentTimeMillis()-start) + " MilliSeconds"); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); }}}}Copy the code

The output is as follows

Order 00000001 will be deleted... After 3003 MilliSeconds 00000002 After 6006 MilliSeconds 00000003 After 9006 MilliSeconds 00000004 After 12008 MilliSeconds 00000005 After 15009 MilliSecondsCopy the code

You can see that the delay is 3 seconds and the order is deleted

The advantages and disadvantages

Advantages: High efficiency, low task trigger time delay.

Disadvantages:

(1) After the server is restarted, all the data disappear, afraid of downtime (2) cluster expansion is quite troublesome (3) due to memory constraints, such as too many unpaid orders, it is easy to appear OOM exception (4) high code complexity

(3) Time round algorithm

Train of thought

Let’s start with a picture of a time wheel (it’s everywhere)

The picture

The time wheel algorithm can be analogous to a clock. For example, in the figure above, the arrow (pointer) rotates at a fixed frequency in a certain direction, and each beat is called a tick. It can be seen that the timing wheel consists of three important attribute parameters, ticksPerWheel (the number of ticks in a cycle), tickDuration (the duration of a tick), and timeUnit (timeUnit). For example, when ticksPerWheel=60, TickDuration =1, timeUnit= second, which is exactly like the constant second hand movement in real life.

If the current pointer is above 1 and I have a task that needs to be executed in 4 seconds, the thread callback or message for that execution will be placed on 5. What if you need to execute it after 20 seconds? Because the number of slots in the loop is only up to 8, if it takes 20 seconds, the pointer needs to make two more turns. Position is above 5 after 2 turns (20% 8 + 1)

implementation

We use Netty’s HashedWheelTimer to do this

Add the following dependencies to the Pom

Ty < dependency > < groupId > io.net < / groupId > < artifactId > netty -all < / artifactId > < version > 4.1.24. The Final < / version > </dependency>Copy the code

The test code HashedWheelTimerTest is shown below

package com.rjzheng.delay3; import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; import io.netty.util.Timer; import io.netty.util.TimerTask; import java.util.concurrent.TimeUnit; public class HashedWheelTimerTest { static class MyTimerTask implements TimerTask{ boolean flag; public MyTimerTask(boolean flag){ this.flag = flag; } public void run(Timeout Timeout) throws Exception {// TODO auto-generated method stub system.out. println(" The order is deleted from the database. ..." ); this.flag =false; } } public static void main(String[] argv) { MyTimerTask timerTask = new MyTimerTask(true); Timer timer = new HashedWheelTimer(); timer.newTimeout(timerTask, 5, TimeUnit.SECONDS); int i = 1; while(timerTask.flag){ try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } system.out. println(I +" seconds passed "); i++; }}}Copy the code

The output is as follows

1 second has passed 2 seconds have passed 3 seconds have passed 4 seconds have passed 5 seconds have gone to the database to delete the order... Six seconds passedCopy the code

The advantages and disadvantages

Advantages: High efficiency, lower delay time of task triggering and lower code complexity than delayQueue.

Disadvantages:

(1) After the server restarts, all the data disappears, so as to avoid downtime

(2) Cluster expansion is quite troublesome

(3) Due to limited memory conditions, such as too many unpaid orders, it is easy to appear OOM exception

(4) the redis cache

  • Thinking a

Zset of Redis is used,zset is an ordered set, each element (member) is associated with a score, and the value in the set can be obtained by score sorting

ZADD key score member [[score member] [score member]…

ZRANGE key start stop [WITHSCORES]

Query element score:ZSCORE key member

ZREM key member [member…

Test the following

Add one element redis> ZADD page_rank 10 google.com (integer) 1 Add multiple elements redis> ZADD page_rank 9 baidu.com 8 bing.com (integer) 2 Redis > ZRANGE page_rank 0-1 WITHSCORES 1) "bing.com" 2) "8" 3) "baidu.com" 4) "9" 5) "google.com" 6) "10 Redis > ZSCORE page_rank bing.com "8" Remove a single element redis> ZREM page_rank google.com (integer) 1 redis> ZRANGE page_rank 0-1 WITHSCORES 1) "bing.com" 2) "8" 3) "baidu.com" 4) "9"Copy the code

So how do you do that? We set the order timeout timestamp and order number to score and member respectively, and the system scans the first element to determine whether it has timeout, as shown in the figure below

The picture

Implement a

package com.rjzheng.delay4; import java.util.Calendar; import java.util.Set; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.Tuple; Public class AppTest {private static final String ADDR = "127.0.0.1"; private static final int PORT = 6379; private static JedisPool jedisPool = new JedisPool(ADDR, PORT); public static Jedis getJedis() { return jedisPool.getResource(); Public void productionDelayMessage(){for(int I =0; i<5; Calendar cal1 = calendar.getInstance (); Calendar cal1 = calendar.getInstance (); cal1.add(Calendar.SECOND, 3); int second3later = (int) (cal1.getTimeInMillis() / 1000); AppTest.getJedis().zadd("OrderId",second3later,"OID0000001"+i); System.out.println(system.currentTimemillis ()+"ms:redis generates an order task: order ID is "+"OID0000001"+ I); Public void consumerDelayMessage(){Jedis Jedis = apptest.getJedis (); while(true){ Set<Tuple> items = jedis.zrangeWithScores("OrderId", 0, 1); If (the items = = null | | items. The isEmpty ()) {System. Out. Println (" currently there is no waiting for the task of "); try { Thread.sleep(500); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } continue; } int score = (int) ((Tuple)items.toArray()[0]).getScore(); Calendar cal = Calendar.getInstance(); int nowSecond = (int) (cal.getTimeInMillis() / 1000); if(nowSecond >= score){ String orderId = ((Tuple)items.toArray()[0]).getElement(); jedis.zrem("OrderId", orderId); System.out.println(system.currentTimemillis () +"ms:redis consumes a task: consume the order OrderId as "+ OrderId); } } } public static void main(String[] args) { AppTest appTest =new AppTest(); appTest.productionDelayMessage(); appTest.consumerDelayMessage(); }}Copy the code

The corresponding output is as follows

The picture

As you can see, it’s almost always 3 seconds after the consumption order.

However, there is a fatal flaw in this version. Under high concurrency conditions, multiple consumers will fetch the same order number, and we will test the code ThreadTest

package com.rjzheng.delay4;

import java.util.concurrent.CountDownLatch;

public class ThreadTest {

    private static final int threadNum = 10;

    private static CountDownLatch cdl = newCountDownLatch(threadNum);

    static class DelayMessage implements Runnable{

        public void run() {

            try {

                cdl.await();

            } catch (InterruptedException e) {

                // TODO Auto-generated catch block

                e.printStackTrace();

            }

            AppTest appTest =new AppTest();

            appTest.consumerDelayMessage();

        }

    }

    public static void main(String[] args) {

        AppTest appTest =new AppTest();

        appTest.productionDelayMessage();

        for(int i=0;i<threadNum;i++){

            new Thread(new DelayMessage()).start();

            cdl.countDown();

        }

    }

}
Copy the code

The output is shown below

The picture

Obviously, multiple threads consume the same resource.

The solution

(1) With distributed lock, but with distributed lock, the performance is reduced, the scheme will not be detailed.

(2) Judge the return value of ZREM, and consume data only when it is greater than 0, so the value in the consumerDelayMessage() method is

if(nowSecond >= score){ String orderId = ((Tuple)items.toArray()[0]).getElement(); jedis.zrem("OrderId", orderId); System.out.println(system.currentTimemillis ()+"ms:redis consumes a task: consume the order OrderId as "+ OrderId); }Copy the code

Modified to

if(nowSecond >= score){ String orderId = ((Tuple)items.toArray()[0]).getElement(); Long num = jedis.zrem("OrderId", orderId); if( num ! = null && num>0){system.out.println (system.currentTimemillis ()+"ms: OrderId = "+ OrderId); }}Copy the code

After this modification, re-run the ThreadTest class and find that the output is fine

  • Idea 2

The solution uses Redis’s Keyspace Notifications mechanism, which provides a callback when a key fails, essentially sending a message to the client. Redis version 2.8 or above is required.

Realize the

In redis.conf, add a configuration

notify-keyspace-events Ex

Run the following code

package com.rjzheng.delay5; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPubSub; Public class RedisTest {private static final String ADDR = "127.0.0.1"; private static final int PORT = 6379; private static JedisPool jedis = new JedisPool(ADDR, PORT); private static RedisSub sub = new RedisSub(); public static void init() { new Thread(new Runnable() { public void run() { jedis.getResource().subscribe(sub, "__keyevent@0__:expired"); } }).start(); } public static void main(String[] args) throws InterruptedException { init(); for(int i =0; i<10; i++){ String orderId = "OID000000"+i; jedis.getResource().setex(orderId, 3, orderId); System.out.println(system.currentTimemillis ()+"ms:"+orderId+" order generation "); } } static class RedisSub extends JedisPubSub { <ahref='http://www.jobbole.com/members/wx610506454'>@Override</a> public Void onMessage(String channel, String message) {system.out.println (system.currentTimemillis ()+"ms:"+message+" order cancelled "); }}}Copy the code

The output is as follows

The picture

You can clearly see that three seconds later, the order is cancelled

Ps: Redis pub/sub has a bug

Because Redis Pub/Sub is fire and forget currently there is no way to use this feature if your application demands reliable notification of events, that is, if your Pub/Sub client disconnects, and reconnects later, all the events delivered during the time the client was disconnected are lost.

Redis’s publish/subscribe model is currently fire and forget, making it impossible to reliably notify events. That is, if the publish/subscribe client disconnects and then reconnects, all events during the client disconnection are lost. Therefore, option two is not recommended. Of course, if you don’t have high reliability requirements, you can use it.

The advantages and disadvantages

Advantages :(1) since Redis is used as the message channel, messages are stored in Redis. If the sender or task handler dies, it is possible to reprocess the data after the restart. (2) cluster expansion is quite convenient (3) high time accuracy

Disadvantages :(1) additional redis maintenance is required

(5) Use message queues

We can use rabbitMQ’s delayed queue. RabbitMQ has the following two features to implement delayed queuing

RabbitMQ can set x-message-tt for Queue and Message to control the lifetime of messages. If a timeout occurs, the Message will become a dead letter

The Queue of lRabbitMQ can be configured with x-dead-letter-exchange and x-dead-letter-routing-key (optional) to control the occurrence of deadletter in the Queue, and re-route according to these two parameters. By combining these two features, you can simulate the function of delayed messages. More specifically, I’ll write another article, but it’s too long to cover here.

The advantages and disadvantages

Advantages: high efficiency, easily scale-out using rabbitMQ’s distributed nature, and message persistence for increased reliability.

Disadvantages: Its ease of use depends on the operation and maintenance of rabbitMq. The complexity and cost of referencing rabbitMq increases.


Recommend 3 original Springboot +Vue projects, with complete video explanation and documentation and source code:

Build a complete project from Springboot+ ElasticSearch + Canal

  • Video tutorial: www.bilibili.com/video/BV1Jq…
  • A complete development documents: www.zhuawaba.com/post/124
  • Online demos: www.zhuawaba.com/dailyhub

【VueAdmin】 hand to hand teach you to develop SpringBoot+Jwt+Vue back-end separation management system

  • Full 800 – minute video tutorial: www.bilibili.com/video/BV1af…
  • Complete development document front end: www.zhuawaba.com/post/18
  • Full development documentation backend: www.zhuawaba.com/post/19

【VueBlog】 Based on SpringBoot+Vue development of the front and back end separation blog project complete teaching

  • Full 200 – minute video tutorial: www.bilibili.com/video/BV1af…
  • Full development documentation: www.zhuawaba.com/post/17

If you have any questions, please come to my official account [Java Q&A Society] and ask me