What is a deferred task?

Oddly enough, we call tasks that need to be deferred a deferred task.

Deferred tasks can be used in the following scenarios:

  1. Red envelope is not checked for 24 hours, need to delay the return business;
  2. On the bill day of each month, the statement of the current month needs to be sent to the user.
  3. 30 minutes after the order is placed, the system automatically cancels the order if the user does not pay for it.

And other events require the use of deferred tasks.

Analysis of delayed task implementation

The key to delayed task implementation is to execute a task at a certain point in time. Based on this information, we can think of the following two ways to achieve delayed tasks:

  1. His handwriting a “loop” has been judging the current time node has to execute the task;
  2. Use JDK or third-party utility classes to implement deferred tasks.

The key words we can think of by JDK to achieve delayed task are: DelayQueue, ScheduledExecutorService, and the third party to provide delayed task execution methods have a lot of, such as: Redis, Netty, MQ and other means.

Delayed task implementation

Let’s look at the implementation of each deferred task in combination with the code below.

1. An infinite loop to implement delayed tasks

In this way, we need to open an infinite loop of scanning tasks, and then use a Map set to store tasks and delay execution time, the implementation code is as follows:

import java.time.Instant; import java.time.LocalDateTime; import java.util.HashMap; import java.util.Iterator; import java.util.Map; Private static Map<String, private static Map<String, private static Map<String, Long> _TaskMap = new HashMap<>(); Public static void main(String[] args) {system.out.println (" localDatetime.now () "); // Add a scheduled task _taskmap. put("task-1", instant.now ().plusseconds (3).toepochmilli ()); // delay 3s // call infinite loop implementation delay task loopTask(); } public static void loopTask() {Long itemLong = 0L; while (true) { Iterator it = _TaskMap.entrySet().iterator(); while (it.hasNext()) { Map.Entry entry = (Map.Entry) it.next(); itemLong = (Long) entry.getValue(); If (instant.now ().toepochmilli () >= itemLong) {system.out.println (" Execute task: "+ entry.getKey() +", execution time: "+ localDatetime.now ()); Remove (entry.getKey()); // Delete task _taskmap.remove (entry.getkey ()); } } } } }Copy the code

The results of the above procedures are as follows:

Program start time: 2020-04-12T18:51:28.188

Execution task: Task-1, execution time: 2020-04-12T18:51:31.189

It can be seen that the task is delayed by 3s, which is in line with our expectations.

2.Java API implementation of deferred tasks

The Java API provides two methods for implementing deferred tasks: DelayQueue and ScheduledExecutorService.

ScheduledExecutorService Implements delayed tasks

ScheduledExecutorService (ScheduledExecutorService) can be used to execute tasks at a fixed frequency, as follows:

Public class DelayTaskExample {public static void main(String[] args) {system.out.println ( " + LocalDateTime.now()); scheduledExecutorServiceTask(); } / * * * ScheduledExecutorService realize fixed frequency has been circulating a mission * / public static void scheduledExecutorServiceTask () { ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); Executor. ScheduleWithFixedDelay (new Runnable () {@ Override public void the run () {/ / mission of the business code System. Out. The println (" mission "+ + localDatetime.now ()); }}, 2, // first execution interval 2, // second execution timeunit.seconds); }}Copy the code

The results of the above procedures are as follows:

Program start time: 2020-04-12T21:28:10.416

Execution task, execution time: 2020-04-12T21:28:12.421

Execution task, execution time: 2020-04-12T21:28:14.422

.

ScheduledExecutorService#scheduleWithFixedDelay(…) Method, the deferred task is executed in a continuous loop at a certain frequency.

② DelayQueue implements deferred tasks

DelayQueue is an unbounded blocking queue that supports Delayed acquisition elements that must implement the Delayed interface and rewrite the getDelay(TimeUnit) and compareTo(Delayed) methods, DelayQueue the complete code for DelayQueue is as follows:

public class DelayTest { public static void main(String[] args) throws InterruptedException { DelayQueue delayQueue = new DelayQueue(); // Add delay task delayQueue.put(new DelayElement(1000)); delayQueue.put(new DelayElement(3000)); delayQueue.put(new DelayElement(5000)); System. The out. Println (" start time: "+ DateFormat. GetDateTimeInstance (). The format (new Date ())); while (! Delayqueue.isempty ()){system.out.println (delayqueue.take ()); } System. Out. Println (" end time: "+ DateFormat. GetDateTimeInstance (). The format (new Date ())); } static class DelayElement implements Delayed {// Long delayTime = system.currentTimemillis (); static class DelayElement implements Delayed {// Long delayTime = system.currentTimemillis (); public DelayElement(long delayTime) { this.delayTime = (this.delayTime + delayTime); Public long getDelay(TimeUnit unit) {return unit.convert(delaytime-system.currentTimemillis (), TimeUnit.MILLISECONDS); } public int compareTo(Delayed) {if (this.getdelay (timeunit.milliseconds) > o.getDelay(TimeUnit.MILLISECONDS)) { return 1; } else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) { return -1; } else { return 0; } } @Override public String toString() { return DateFormat.getDateTimeInstance().format(new Date(delayTime)); }}}Copy the code

The results of the above procedures are as follows:

Start time: 2020-4-12 20:40:38

The 2020-4-12 20:40:39

The 2020-4-12 20:40:41

The 2020-4-12 20:40:43

End time: 2020-4-12 20:40:43

3.Redis implements deferred tasks

There are two ways to implement delayed tasks using Redis: zset data judgment and key space notification.

① The way to judge by data

We use zSET data type to store delayed tasks in this data set, and then consume all tasks that start a wireless loop to query the current time. The implementation code is as follows (Jedis framework is required) :

import redis.clients.jedis.Jedis; import utils.JedisUtils; import java.time.Instant; import java.util.Set; public class DelayQueueExample { // zset key private static final String _KEY = "myDelayQueue"; public static void main(String[] args) throws InterruptedException { Jedis jedis = JedisUtils.getJedis(); // Delay 30s (30s) long delayTime = instant.now ().plusseconds (30).getepochSecond (); jedis.zadd(_KEY, delayTime, "order_1"); Jedis.zadd (_KEY, instant.now ().plusseconds (2).getepochSecond (), "order_2"); jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_3"); jedis.zadd(_KEY, Instant.now().plusSeconds(7).getEpochSecond(), "order_4"); jedis.zadd(_KEY, Instant.now().plusSeconds(10).getEpochSecond(), "order_5"); // Enable doDelayQueue(jedis); @param jedis Redis */ public static void doDelayQueue(jedis jedis) throws InterruptedException { While (true) {// Instant nowInstant = Instant. Now (); long lastSecond = nowInstant.plusSeconds(-1).getEpochSecond(); / / on a second time long nowSecond = nowInstant. GetEpochSecond (); Set<String> data = jedis.zrangebyScore (_KEY, lastSecond, nowSecond); For (String item: data) {system.out.println (" item: "+ item); } // Delete executed task jedis.zremrangebyScore (_KEY, lastSecond, nowSecond); Thread.sleep(1000); // Poll once per second}}}Copy the code

② Notification via key space

By default, keyspace notification is not enabled on Redis server. To enable keyspace notification manually, run the config set notify-keyspace-events Ex command. After enabling keyspace notification, we can obtain each keyvalue expiration event. We use this mechanism to realize the function of starting a scheduled task for everyone, and the code is as follows:

import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPubSub; import utils.JedisUtils; public class TaskExample { public static final String _TOPIC = "__keyevent@0__:expired"; Public static void main(String[] args) {Jedis Jedis = jedisutils.getJedis (); // Execute the scheduled task doTask(jedis); } /** * Subscribe to expiration messages, Public static void doTask(jedis jedis) {// Subscribe jedis.psubscribe(new JedisPubSub()) {@override public void onPMessage(String pattern, String channel, String message) { Println (" Received message: "+ message); } }, _TOPIC); }}Copy the code

4.Net TY Implements deferred tasks

Netty is a Java open source framework provided by JBOSS. It is a client and server programming framework based on NIO. Using Netty, you can quickly and easily develop a network application, such as a client and server application that implements a certain protocol. Netty simplifies and streamlines the programming and development of network applications, such as TCP and UDP-based socket service development.

You can use Netty’s HashedWheelTimer utility class to implement deferred tasks. The implementation code is as follows.

First add a Netty reference to the project as follows:

<! -- https://mvnrepository.com/artifact/io.netty/netty-common --> <dependency> <groupId>io.netty</groupId> < artifactId > netty - common < / artifactId > < version > 4.1.48. The Final < / version > < / dependency >Copy the code

The complete code for Netty implementation is as follows:

Public class DelayTaskExample {public static void main(String[] args) {system.out.println ( " + LocalDateTime.now()); NettyTask(); } /** * Private static void NettyTask() {HashedWheelTimer timer = new HashedWheelTimer(3, // Time interval timeunit.seconds, 100); TimerTask task = new TimerTask() {@override public void run(Timeout Timeout) throws Exception { System.out.println(" execute task "+", execute time: "+ localDatetime.now ()); }}; NewTimeout (task, 0, timeunit.seconds); // Add the task to the delay queue. }}Copy the code

The results of the above procedures are as follows:

Program start time: 2020-04-13T10:16:23.033

Task execution, execution time: 2020-04-13T10:16:26.118

HashedWheelTimer is implemented by using the timing wheel. The timing wheel is actually a loop-shaped data structure, which can be thought of as a clock, divided into many cells, each representing a certain time. On this cell, a linked list is used to store the timeout task to be executed, and a pointer moves one by one. The delay task corresponding to the grid is performed when the grid is reached, as shown below:

The above picture can be understood as a time wheel with a size of 8, which rotates one grid at a certain time (for example, 1s), and each grid points to a linked list of tasks to be performed.

5.MQ implements deferred tasks

It would be a bit of a dead end luxury to have an MQ middleware dedicated to executing deferred tasks, but it is still desirable if you already have an MQ environment.

Almost all MQ middleware can implement deferred tasks, which would be more accurately called deferred queues. This article uses RabbitMQ as an example to see how it can implement deferred tasks.

RabbitMQ implements delay queuing in two ways:

  • After the message expires, it enters the dead letter exchange and is forwarded to the delay consumption queue by the exchange to realize the delay function.
  • Use the Rabbitmq-delayed -message-exchange plug-in for delay.

Note: The rabbitmq-delayed-message-exchange plugin is supported in RabbitMQ 3.5.7 and above and depends on Erlang/OPT 18.0 and above.

The second implementation of the rabbitmq-delayed-message-exchange plugin is recommended because of the complexity of using a dead letter exchange.

First, we need to download and install the rabbitmq – of – the message – exchange plug-in, download address: www.rabbitmq.com/community-p…

Install rabbitmq-plugins to rabbitmq_delayed_message_exchange. Install rabbitmq-plugins to rabbitmq_delayed_message_exchange. The rabbitmq-plugins list command is used to query all installed plug-ins. The following figure shows the successful installation:

Finally, restart the RabbitMQ service for the plug-in to take effect.

First, we need to configure the message queue. The code is as follows:

import com.example.rabbitmq.mq.DirectConfig; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class DelayedConfig { final static String QUEUE_NAME = "delayed.goods.order"; final static String EXCHANGE_NAME = "delayedec"; @Bean public Queue queue() { return new Queue(DelayedConfig.QUEUE_NAME); } @bean CustomExchange CustomExchange () {Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); // The second parameter is type: Must be X-delayed -message return new CustomExchange(delayedconfig.exchange_name, "X-delayed -message", true, false, args); } @bean Binding Binding (Queue Queue, CustomExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DelayedConfig.QUEUE_NAME).noargs(); }}Copy the code

Then add the code to add the message as follows:

import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.text.SimpleDateFormat; import java.util.Date; @Component public class DelayedSender { @Autowired private AmqpTemplate rabbitTemplate; public void send(String msg) { SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println(" send time: "+ sf.format(new Date())); rabbitTemplate.convertAndSend(DelayedConfig.EXCHANGE_NAME, DelayedConfig.QUEUE_NAME, msg, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setHeader("x-delay", 3000); return message; }}); }}Copy the code

Add the code to consume the message:

import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.text.SimpleDateFormat; import java.util.Date; @Component @RabbitListener(queues = "delayed.goods.order") public class DelayedReceiver { @RabbitHandler public void process(String msg) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println(" format(new Date()) "); System.out.println(" message content: "+ MSG); }}Copy the code

Finally, let’s test this with code:

import com.example.rabbitmq.RabbitmqApplication; import com.example.rabbitmq.mq.delayed.DelayedSender; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.text.SimpleDateFormat; import java.util.Date; @RunWith(SpringRunner.class) @SpringBootTest public class DelayedTest { @Autowired private DelayedSender sender; @Test public void Test() throws InterruptedException { SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd"); sender.send("Hi Admin."); Thread.sleep(5 * 1000); // Wait for the receiver to execute before exiting the test}}Copy the code

The execution results of the above procedures are as follows:

Delivery time: 2020-04-13 20:47:51

Receiving time :2020-04-13 20:47:54

Message content: Hi Admin.

It can be seen from the results that the above program execution conforms to the implementation expectation of the delayed task.

6. Use Spring to schedule tasks

This article uses the SpringBoot project to demonstrate the implementation of Scheduled changes. To implement this, you need to declare the following configuration changes to be on:

@SpringBootApplication @EnableScheduling public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); }}Copy the code

Then add the delay task, the implementation code is as follows:

@Component public class ScheduleJobs { @Scheduled(fixedDelay = 2 * 1000) public void fixedDelayJob() throws InterruptedException {system.out.println (" Task execution, time: "+ localDatetime.now ()); }}Copy the code

When we start the project, we can see that the task has been looping for 2 seconds, with the result as follows:

Mission execution, time: 2020-04-13T14:07:53.349

Task execution, time: 2020-04-13T14:07:55.350

Task execution, time: 2020-04-13T14:07:57.351

.

We can also use Corn expressions to define how often tasks should be executed, for example using @scheduled (cron = “0/4 * * * *?” ).

7.Quartz implements deferred tasks

Quartz is a powerful task scheduler for complex scheduling and distributed task scheduling.

We use Quartz to implement a deferred task, first defining an execution task code as follows:

import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.springframework.scheduling.quartz.QuartzJobBean; import java.time.LocalDateTime; public class SampleJob extends QuartzJobBean { @Override protected void executeInternal(JobExecutionContext JobExecutionContext) throws JobExecutionException {system.out.println (" Task execution, time: "+ localDateTime.now ()); }}Copy the code

In defining a JobDetail and Trigger implementation code is as follows:

import org.quartz.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class SampleScheduler { @Bean public JobDetail sampleJobDetail() { return JobBuilder.newJob(SampleJob.class).withIdentity("sampleJob") .storeDurably().build(); } @bean public Trigger sampleJobTrigger() {SimpleScheduleBuilder scheduleBuilder = SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(3).withRepeatCount(1); return TriggerBuilder.newTrigger().forJob(sampleJobDetail()).withIdentity("sampleTrigger") .withSchedule(scheduleBuilder).build(); }}Copy the code

Finally, after the SpringBoot project is started, the implementation code is as follows:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;

/**
 * SpringBoot 项目启动后执行
 */
public class MyStartupRunner implements CommandLineRunner {

    @Autowired
    private SchedulerFactoryBean schedulerFactoryBean;

    @Autowired
    private SampleScheduler sampleScheduler;

    @Override
    public void run(String... args) throws Exception {
        // 启动定时任务
        schedulerFactoryBean.getScheduler().scheduleJob(
                sampleScheduler.sampleJobTrigger());
    }
}
Copy the code

The execution results of the above procedures are as follows:

The 2020-04-13 19:02:12. 17768-331 the INFO [restartedMain] com. Example. Demo. DemoApplication: Started DemoApplication in 1.815 seconds (JVM running for 3.088)

Task execution, time: 2020-04-13T19:02:15.019

It can be seen from the results that the delayed task was executed 3s after the project started.

conclusion

This article describes the application scenarios of delayed tasks, and 10 ways to implement delayed tasks:

  1. Manual wireless loop;
  2. ScheduledExecutorService;
  3. DelayQueue;
  4. Redis zset data judgment method;
  5. Redis key space notification method;
  6. HashedWheelTimer utility class provided by Netty;
  7. RabbitMQ dead letter queue;
  8. RabbitMQ delayed message plugin rabbitmq-delayed message-exchange;
  9. Spring Scheduled;
  10. Quartz.

Write in the last

Welcome to pay attention to my public number [calm as code], massive Java related articles, learning materials will be updated in it, sorting out the data will be placed in it.

If you think it’s written well, click a “like” and add a follow! Point attention, do not get lost, continue to update!!