This article was born thanks to a reader who made this excellent article available to you, with emphasis on excellent article, haha.

Here’s what happened…

Don’t thank me. Roses are sweet. I’m sure this will not disappoint you, as it will be the best article on “deferred tasks” on the market, and it has always been my goal in writing to make every article I write just a little bit better than the ones on the market.

Ok, without further ado, let’s go straight to today’s topic. The main content of this article is as follows:

What is a deferred task?

Oddly enough, we call tasks that need to be deferred a deferred task.

Deferred tasks can be used in the following scenarios:

  1. Red envelope is not checked for 24 hours, need to delay the return business;
  2. On the bill day of each month, the statement of the current month needs to be sent to the user.
  3. 30 minutes after the order is placed, the system automatically cancels the order if the user does not pay for it.

And other events require the use of deferred tasks.

Analysis of delayed task implementation

The key to delayed task implementation is to execute a task at a certain point in time. Based on this information, we can think of the following two ways to achieve delayed tasks:

  1. His handwriting a “loop” has been judging the current time node has to execute the task;
  2. Use JDK or third-party utility classes to implement deferred tasks.

The key words we can think of by JDK to achieve delayed task are: DelayQueue, ScheduledExecutorService, and the third party to provide delayed task execution methods have a lot of, such as: Redis, Netty, MQ and other means.

Delayed task implementation

Let’s look at the implementation of each deferred task in combination with the code below.

1. An infinite loop to implement delayed tasks

In this way, we need to open an infinite loop of scanning tasks, and then use a Map set to store tasks and delay execution time, the implementation code is as follows:

import java.time.Instant;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

/** * Delay task execution method summary */
public class DelayTaskExample {
    // Stores scheduled tasks
    private static Map<String, Long> _TaskMap = new HashMap<>();

    public static void main(String[] args) {
        System.out.println("Program start time:" + LocalDateTime.now());
        // Add a scheduled task
        _TaskMap.put("task-1", Instant.now().plusSeconds(3).toEpochMilli()); / / delay 3 s

        // Call an infinite loop to delay the task
        loopTask();
    }

    /** * an infinite loop to implement delayed tasks */
    public static void loopTask(a) {
        Long itemLong = 0L;
        while (true) {
            Iterator it = _TaskMap.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry entry = (Map.Entry) it.next();
                itemLong = (Long) entry.getValue();
                // A task needs to be executed
                if (Instant.now().toEpochMilli() >= itemLong) {
                    // Delay the task, business logic execution
                    System.out.println("On a mission:" + entry.getKey() +
                            ", execution time: + LocalDateTime.now());
                    // Delete the task
                    _TaskMap.remove(entry.getKey());
                }
            }
        }
    }
}
Copy the code

The results of the above procedures are as follows:

Program start time: 2020-04-12T18:51:28.188

Execution task: Task-1, execution time: 2020-04-12T18:51:31.189

It can be seen that the task is delayed by 3s, which is in line with our expectations.

2.Java API implementation of deferred tasks

The Java API provides two methods for implementing deferred tasks: DelayQueue and ScheduledExecutorService.

ScheduledExecutorService Implements delayed tasks

ScheduledExecutorService (ScheduledExecutorService) can be used to execute tasks at a fixed frequency, as follows:

public class DelayTaskExample {
    public static void main(String[] args) {
        System.out.println("Program start time:" + LocalDateTime.now());
        scheduledExecutorServiceTask();
    }

    /** * ScheduledExecutorService implements a constant frequency to execute tasks */
    public static void scheduledExecutorServiceTask(a) {
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
        executor.scheduleWithFixedDelay(
                new Runnable() {
                    @Override
                    public void run(a) {
                        // The business code to execute the task
                        System.out.println("On a mission" +
                                ", execution time:+ LocalDateTime.now()); }},2.// First execution interval
                2.// execute onceTimeUnit.SECONDS); }}Copy the code

The results of the above procedures are as follows:

Program start time: 2020-04-12T21:28:10.416

Execution task, execution time: 2020-04-12T21:28:12.421

Execution task, execution time: 2020-04-12T21:28:14.422

.

ScheduledExecutorService#scheduleWithFixedDelay(…) Method, the deferred task is executed in a continuous loop at a certain frequency.

② DelayQueue implements deferred tasks

DelayQueue is an unbounded blocking queue that supports Delayed acquisition elements that must implement the Delayed interface and rewrite the getDelay(TimeUnit) and compareTo(Delayed) methods, DelayQueue the complete code for DelayQueue is as follows:

public class DelayTest {
    public static void main(String[] args) throws InterruptedException {
        DelayQueue delayQueue = new DelayQueue();
        // Add a delay task
        delayQueue.put(new DelayElement(1000));
        delayQueue.put(new DelayElement(3000));
        delayQueue.put(new DelayElement(5000));
        System.out.println("Start time:" +  DateFormat.getDateTimeInstance().format(new Date()));
        while(! delayQueue.isEmpty()){// Execute the deferred task
            System.out.println(delayQueue.take());
        }
        System.out.println("End time:" +  DateFormat.getDateTimeInstance().format(new Date()));
    }

    static class DelayElement implements Delayed {
        // Delay cutoff time (single side: ms)
        long delayTime = System.currentTimeMillis();
        public DelayElement(long delayTime) {
            this.delayTime = (this.delayTime + delayTime);
        }
        @Override
        // Get the remaining time
        public long getDelay(TimeUnit unit) {
            return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
        @Override
        // The order of the elements in the queue
        public int compareTo(Delayed o) {
            if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
                return 1;
            } else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {
                return -1;
            } else {
                return 0; }}@Override
        public String toString(a) {
            return DateFormat.getDateTimeInstance().format(newDate(delayTime)); }}}Copy the code

The results of the above procedures are as follows:

Start time: 2020-4-12 20:40:38

The 2020-4-12 20:40:39

The 2020-4-12 20:40:41

The 2020-4-12 20:40:43

End time: 2020-4-12 20:40:43

3.Redis implements deferred tasks

There are two ways to implement delayed tasks using Redis: zset data judgment and key space notification.

① The way to judge by data

We use zSET data type to store delayed tasks in this data set, and then consume all tasks that start a wireless loop to query the current time. The implementation code is as follows (Jedis framework is required) :

import redis.clients.jedis.Jedis;
import utils.JedisUtils;
import java.time.Instant;
import java.util.Set;

public class DelayQueueExample {
    // zset key
    private static final String _KEY = "myDelayQueue";
    
    public static void main(String[] args) throws InterruptedException {
        Jedis jedis = JedisUtils.getJedis();
        // Delay 30s execution (after 30s)
        long delayTime = Instant.now().plusSeconds(30).getEpochSecond();
        jedis.zadd(_KEY, delayTime, "order_1");
        // Continue adding test data
        jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_2");
        jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_3");
        jedis.zadd(_KEY, Instant.now().plusSeconds(7).getEpochSecond(), "order_4");
        jedis.zadd(_KEY, Instant.now().plusSeconds(10).getEpochSecond(), "order_5");
        // Enable delay queue
        doDelayQueue(jedis);
    }

    /** * Delay queue consumption *@paramJedis Redis client */
    public static void doDelayQueue(Jedis jedis) throws InterruptedException {
        while (true) {
            // The current time
            Instant nowInstant = Instant.now();
            long lastSecond = nowInstant.plusSeconds(-1).getEpochSecond(); // Last one second
            long nowSecond = nowInstant.getEpochSecond();
            // Query all tasks at the current time
            Set<String> data = jedis.zrangeByScore(_KEY, lastSecond, nowSecond);
            for (String item : data) {
                // Consume tasks
                System.out.println("Consumption:" + item);
            }
            // Delete an already executed task
            jedis.zremrangeByScore(_KEY, lastSecond, nowSecond);
            Thread.sleep(1000); // Poll once per second}}}Copy the code

② Notification via key space

By default, keyspace notification is not enabled on Redis server. To enable keyspace notification manually, run the config set notify-keyspace-events Ex command. After enabling keyspace notification, we can obtain each keyvalue expiration event. We use this mechanism to realize the function of starting a scheduled task for everyone, and the code is as follows:

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
import utils.JedisUtils;

public class TaskExample {
    public static final String _TOPIC = "__keyevent@0__:expired"; // Subscribe to channel name
    public static void main(String[] args) {
        Jedis jedis = JedisUtils.getJedis();
        // Perform scheduled tasks
        doTask(jedis);
    }

    /** * Subscribe expiration messages to perform scheduled tasks *@paramJedis Redis client */
    public static void doTask(Jedis jedis) {
        // Subscribe to expiration messages
        jedis.psubscribe(new JedisPubSub() {
            @Override
            public void onPMessage(String pattern, String channel, String message) {
                // The message is received and the scheduled task is executed
                System.out.println("Received a message:"+ message); } }, _TOPIC); }}Copy the code

4.Net TY Implements deferred tasks

Netty is a Java open source framework provided by JBOSS. It is a client and server programming framework based on NIO. Using Netty, you can quickly and easily develop a network application, such as a client and server application that implements a certain protocol. Netty simplifies and streamlines the programming and development of network applications, such as TCP and UDP-based socket service development.

You can use Netty’s HashedWheelTimer utility class to implement deferred tasks. The implementation code is as follows.

First add a Netty reference to the project as follows:

<! -- https://mvnrepository.com/artifact/io.netty/netty-common -->
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-common</artifactId>
    <version>4.1.48. The Final</version>
</dependency>

Copy the code

The complete code for Netty implementation is as follows:

public class DelayTaskExample {
    public static void main(String[] args) {
        System.out.println("Program start time:" + LocalDateTime.now());
        NettyTask();
    }

    /** * Netty based delay task */
    private static void NettyTask(a) {
        // Create a delayed task instance
        HashedWheelTimer timer = new HashedWheelTimer(3.// Time interval
                TimeUnit.SECONDS,
                100); // The number of slots in the time wheel
        // Create a task
        TimerTask task = new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                System.out.println("On a mission" +
                        ", execution time:+ LocalDateTime.now()); }};// Add the task to the delay queue
        timer.newTimeout(task, 0, TimeUnit.SECONDS); }}Copy the code

The results of the above procedures are as follows:

Program start time: 2020-04-13T10:16:23.033

Task execution, execution time: 2020-04-13T10:16:26.118

HashedWheelTimer is implemented by using the timing wheel. The timing wheel is actually a loop-shaped data structure, which can be thought of as a clock, divided into many cells, each representing a certain time. On this cell, a linked list is used to store the timeout task to be executed, and a pointer moves one by one. The delay task corresponding to the grid is performed when the grid is reached, as shown below:

The above picture can be understood as a time wheel with a size of 8, which rotates one grid at a certain time (for example, 1s), and each grid points to a linked list of tasks to be performed.

5.MQ implements deferred tasks

It would be a bit of a dead end luxury to have an MQ middleware dedicated to executing deferred tasks, but it is still desirable if you already have an MQ environment.

Almost all MQ middleware can implement deferred tasks, which would be more accurately called deferred queues. This article uses RabbitMQ as an example to see how it can implement deferred tasks.

RabbitMQ implements delay queuing in two ways:

  • After the message expires, it enters the dead letter exchange and is forwarded to the delay consumption queue by the exchange to realize the delay function.
  • Use the Rabbitmq-delayed -message-exchange plug-in for delay.

Note: The rabbitmq-delayed-message-exchange plugin is supported in RabbitMQ 3.5.7 and above and depends on Erlang/OPT 18.0 and above.

The second implementation of the rabbitmq-delayed-message-exchange plugin is recommended because of the complexity of using a dead letter exchange.

First, we need to download and install the rabbitmq – of – the message – exchange plug-in, download address: www.rabbitmq.com/community-p…

Install rabbitmq-plugins to rabbitmq_delayed_message_exchange. Install rabbitmq-plugins to rabbitmq_delayed_message_exchange. The rabbitmq-plugins list command is used to query all installed plug-ins. The following figure shows the successful installation:

Finally, restart the RabbitMQ service for the plug-in to take effect.

First, we need to configure the message queue. The code is as follows:

import com.example.rabbitmq.mq.DirectConfig;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class DelayedConfig {
    final static String QUEUE_NAME = "delayed.goods.order";
    final static String EXCHANGE_NAME = "delayedec";
    @Bean
    public Queue queue(a) {
        return new Queue(DelayedConfig.QUEUE_NAME);
    }

    // Configure the default switch
    @Bean
    CustomExchange customExchange(a) {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type"."direct");
        // Parameter two is of type: it must be X-delayed -message
        return new CustomExchange(DelayedConfig.EXCHANGE_NAME, "x-delayed-message".true.false, args);
    }
    // Bind the queue to the switch
    @Bean
    Binding binding(Queue queue, CustomExchange exchange) {
        returnBindingBuilder.bind(queue).to(exchange).with(DelayedConfig.QUEUE_NAME).noargs(); }}Copy the code

Then add the code to add the message as follows:

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;

@Component
public class DelayedSender {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send(String msg) {
        SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println("Delivery time:" + sf.format(new Date()));

        rabbitTemplate.convertAndSend(DelayedConfig.EXCHANGE_NAME, DelayedConfig.QUEUE_NAME, msg, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setHeader("x-delay".3000);
                returnmessage; }}); }}Copy the code

Add the code to consume the message:

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;

@Component
@RabbitListener(queues = "delayed.goods.order")
public class DelayedReceiver {
    @RabbitHandler
    public void process(String msg) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println("Received time :" + sdf.format(new Date()));
        System.out.println("Message content:"+ msg); }}Copy the code

Finally, let’s test this with code:

import com.example.rabbitmq.RabbitmqApplication;
import com.example.rabbitmq.mq.delayed.DelayedSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.text.SimpleDateFormat;
import java.util.Date;

@RunWith(SpringRunner.class)
@SpringBootTest
public class DelayedTest {

    @Autowired
    private DelayedSender sender;

    @Test
    public void Test(a) throws InterruptedException {
        SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd");
        sender.send("Hi Admin.");
        Thread.sleep(5 * 1000); // Wait for the receiving program to execute before exiting the test}}Copy the code

The execution results of the above procedures are as follows:

Delivery time: 2020-04-13 20:47:51

Receiving time :2020-04-13 20:47:54

Message content: Hi Admin.

It can be seen from the results that the above program execution conforms to the implementation expectation of the delayed task.

6. Use Spring to schedule tasks

This article uses the SpringBoot project to demonstrate the implementation of Scheduled changes. To implement this, you need to declare the following configuration changes to be on:

@SpringBootApplication
@EnableScheduling
public class Application {
    public static void main(String[] args) { SpringApplication.run(Application.class, args); }}Copy the code

Then add the delay task, the implementation code is as follows:

@Component
public class ScheduleJobs {
    @Scheduled(fixedDelay = 2 * 1000)
    public void fixedDelayJob(a) throws InterruptedException {
        System.out.println(Task execution, time:+ LocalDateTime.now()); }}Copy the code

When we start the project, we can see that the task has been looping for 2 seconds, with the result as follows:

Mission execution, time: 2020-04-13T14:07:53.349

Task execution, time: 2020-04-13T14:07:55.350

Task execution, time: 2020-04-13T14:07:57.351

.

We can also use Corn expressions to define how often tasks should be executed, for example using @scheduled (cron = “0/4 * * * *?” ).

7.Quartz implements deferred tasks

Quartz is a powerful task scheduler for complex scheduling and distributed task scheduling.

We use Quartz to implement a deferred task, first defining an execution task code as follows:

import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.scheduling.quartz.QuartzJobBean;

import java.time.LocalDateTime;

public class SampleJob extends QuartzJobBean {
    @Override
    protected void executeInternal(JobExecutionContext jobExecutionContext)
            throws JobExecutionException {
        System.out.println(Task execution, time:+ LocalDateTime.now()); }}Copy the code

In defining a JobDetail and Trigger implementation code is as follows:

import org.quartz.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class SampleScheduler {
    @Bean
    public JobDetail sampleJobDetail(a) {
        return JobBuilder.newJob(SampleJob.class).withIdentity("sampleJob")
                .storeDurably().build();
    }

    @Bean
    public Trigger sampleJobTrigger(a) {
        // Execute after 3s
        SimpleScheduleBuilder scheduleBuilder =
                SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(3).withRepeatCount(1);
        return TriggerBuilder.newTrigger().forJob(sampleJobDetail()).withIdentity("sampleTrigger") .withSchedule(scheduleBuilder).build(); }}Copy the code

Finally, after the SpringBoot project is started, the implementation code is as follows:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;

/** * Execute */ after the SpringBoot project starts
public class MyStartupRunner implements CommandLineRunner {

    @Autowired
    private SchedulerFactoryBean schedulerFactoryBean;

    @Autowired
    private SampleScheduler sampleScheduler;

    @Override
    public void run(String... args) throws Exception {
        // Start a scheduled taskschedulerFactoryBean.getScheduler().scheduleJob( sampleScheduler.sampleJobTrigger()); }}Copy the code

The execution results of the above procedures are as follows:

The 2020-04-13 19:02:12. 17768-331 the INFO [restartedMain] com. Example. Demo. DemoApplication: Started DemoApplication in 1.815 seconds (JVM running for 3.088)

Task execution, time: 2020-04-13T19:02:15.019

It can be seen from the results that the delayed task was executed 3s after the project started.

conclusion

This article describes the application scenarios of delayed tasks, and 10 ways to implement delayed tasks:

  1. Manual wireless loop;
  2. ScheduledExecutorService;
  3. DelayQueue;
  4. Redis zset data judgment method;
  5. Redis key space notification method;
  6. HashedWheelTimer utility class provided by Netty;
  7. RabbitMQ dead letter queue;
  8. RabbitMQ delayed message plugin rabbitmq-delayed message-exchange;
  9. Spring Scheduled;
  10. Quartz.

The last word

As the saying goes: one minute on stage, ten years off stage. All the contents of this article are the crystallization accumulated by the author for many years of work, as well as the arrangement of staying up late and painstaking. If you feel that this article is helpful to you, please give a thumbs up before you go (your support is my motivation for continuous progress), thank you.

For more exciting content, please follow the wechat public account “Java Chinese Community”.