This article was born thanks to a reader who made this excellent article available to you, with emphasis on excellent article, haha.
Here’s what happened…
Don’t thank me. Roses are sweet. I’m sure this will not disappoint you, as it will be the best article on “deferred tasks” on the market, and it has always been my goal in writing to make every article I write just a little bit better than the ones on the market.
Ok, without further ado, let’s go straight to today’s topic. The main content of this article is as follows:
What is a deferred task?
Oddly enough, we call tasks that need to be deferred a deferred task.
Deferred tasks can be used in the following scenarios:
- Red envelope is not checked for 24 hours, need to delay the return business;
- On the bill day of each month, the statement of the current month needs to be sent to the user.
- 30 minutes after the order is placed, the system automatically cancels the order if the user does not pay for it.
And other events require the use of deferred tasks.
Analysis of delayed task implementation
The key to delayed task implementation is to execute a task at a certain point in time. Based on this information, we can think of the following two ways to achieve delayed tasks:
- His handwriting a “loop” has been judging the current time node has to execute the task;
- Use JDK or third-party utility classes to implement deferred tasks.
The key words we can think of by JDK to achieve delayed task are: DelayQueue, ScheduledExecutorService, and the third party to provide delayed task execution methods have a lot of, such as: Redis, Netty, MQ and other means.
Delayed task implementation
Let’s look at the implementation of each deferred task in combination with the code below.
1. An infinite loop to implement delayed tasks
In this way, we need to open an infinite loop of scanning tasks, and then use a Map set to store tasks and delay execution time, the implementation code is as follows:
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
/** * Delay task execution method summary */
public class DelayTaskExample {
// Stores scheduled tasks
private static Map<String, Long> _TaskMap = new HashMap<>();
public static void main(String[] args) {
System.out.println("Program start time:" + LocalDateTime.now());
// Add a scheduled task
_TaskMap.put("task-1", Instant.now().plusSeconds(3).toEpochMilli()); / / delay 3 s
// Call an infinite loop to delay the task
loopTask();
}
/** * an infinite loop to implement delayed tasks */
public static void loopTask(a) {
Long itemLong = 0L;
while (true) {
Iterator it = _TaskMap.entrySet().iterator();
while (it.hasNext()) {
Map.Entry entry = (Map.Entry) it.next();
itemLong = (Long) entry.getValue();
// A task needs to be executed
if (Instant.now().toEpochMilli() >= itemLong) {
// Delay the task, business logic execution
System.out.println("On a mission:" + entry.getKey() +
", execution time: + LocalDateTime.now());
// Delete the task
_TaskMap.remove(entry.getKey());
}
}
}
}
}
Copy the code
The results of the above procedures are as follows:
Program start time: 2020-04-12T18:51:28.188
Execution task: Task-1, execution time: 2020-04-12T18:51:31.189
It can be seen that the task is delayed by 3s, which is in line with our expectations.
2.Java API implementation of deferred tasks
The Java API provides two methods for implementing deferred tasks: DelayQueue and ScheduledExecutorService.
ScheduledExecutorService Implements delayed tasks
ScheduledExecutorService (ScheduledExecutorService) can be used to execute tasks at a fixed frequency, as follows:
public class DelayTaskExample {
public static void main(String[] args) {
System.out.println("Program start time:" + LocalDateTime.now());
scheduledExecutorServiceTask();
}
/** * ScheduledExecutorService implements a constant frequency to execute tasks */
public static void scheduledExecutorServiceTask(a) {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
executor.scheduleWithFixedDelay(
new Runnable() {
@Override
public void run(a) {
// The business code to execute the task
System.out.println("On a mission" +
", execution time:+ LocalDateTime.now()); }},2.// First execution interval
2.// execute onceTimeUnit.SECONDS); }}Copy the code
The results of the above procedures are as follows:
Program start time: 2020-04-12T21:28:10.416
Execution task, execution time: 2020-04-12T21:28:12.421
Execution task, execution time: 2020-04-12T21:28:14.422
.
ScheduledExecutorService#scheduleWithFixedDelay(…) Method, the deferred task is executed in a continuous loop at a certain frequency.
② DelayQueue implements deferred tasks
DelayQueue is an unbounded blocking queue that supports Delayed acquisition elements that must implement the Delayed interface and rewrite the getDelay(TimeUnit) and compareTo(Delayed) methods, DelayQueue the complete code for DelayQueue is as follows:
public class DelayTest {
public static void main(String[] args) throws InterruptedException {
DelayQueue delayQueue = new DelayQueue();
// Add a delay task
delayQueue.put(new DelayElement(1000));
delayQueue.put(new DelayElement(3000));
delayQueue.put(new DelayElement(5000));
System.out.println("Start time:" + DateFormat.getDateTimeInstance().format(new Date()));
while(! delayQueue.isEmpty()){// Execute the deferred task
System.out.println(delayQueue.take());
}
System.out.println("End time:" + DateFormat.getDateTimeInstance().format(new Date()));
}
static class DelayElement implements Delayed {
// Delay cutoff time (single side: ms)
long delayTime = System.currentTimeMillis();
public DelayElement(long delayTime) {
this.delayTime = (this.delayTime + delayTime);
}
@Override
// Get the remaining time
public long getDelay(TimeUnit unit) {
return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
// The order of the elements in the queue
public int compareTo(Delayed o) {
if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
return 1;
} else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {
return -1;
} else {
return 0; }}@Override
public String toString(a) {
return DateFormat.getDateTimeInstance().format(newDate(delayTime)); }}}Copy the code
The results of the above procedures are as follows:
Start time: 2020-4-12 20:40:38
The 2020-4-12 20:40:39
The 2020-4-12 20:40:41
The 2020-4-12 20:40:43
End time: 2020-4-12 20:40:43
3.Redis implements deferred tasks
There are two ways to implement delayed tasks using Redis: zset data judgment and key space notification.
① The way to judge by data
We use zSET data type to store delayed tasks in this data set, and then consume all tasks that start a wireless loop to query the current time. The implementation code is as follows (Jedis framework is required) :
import redis.clients.jedis.Jedis;
import utils.JedisUtils;
import java.time.Instant;
import java.util.Set;
public class DelayQueueExample {
// zset key
private static final String _KEY = "myDelayQueue";
public static void main(String[] args) throws InterruptedException {
Jedis jedis = JedisUtils.getJedis();
// Delay 30s execution (after 30s)
long delayTime = Instant.now().plusSeconds(30).getEpochSecond();
jedis.zadd(_KEY, delayTime, "order_1");
// Continue adding test data
jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_2");
jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_3");
jedis.zadd(_KEY, Instant.now().plusSeconds(7).getEpochSecond(), "order_4");
jedis.zadd(_KEY, Instant.now().plusSeconds(10).getEpochSecond(), "order_5");
// Enable delay queue
doDelayQueue(jedis);
}
/** * Delay queue consumption *@paramJedis Redis client */
public static void doDelayQueue(Jedis jedis) throws InterruptedException {
while (true) {
// The current time
Instant nowInstant = Instant.now();
long lastSecond = nowInstant.plusSeconds(-1).getEpochSecond(); // Last one second
long nowSecond = nowInstant.getEpochSecond();
// Query all tasks at the current time
Set<String> data = jedis.zrangeByScore(_KEY, lastSecond, nowSecond);
for (String item : data) {
// Consume tasks
System.out.println("Consumption:" + item);
}
// Delete an already executed task
jedis.zremrangeByScore(_KEY, lastSecond, nowSecond);
Thread.sleep(1000); // Poll once per second}}}Copy the code
② Notification via key space
By default, keyspace notification is not enabled on Redis server. To enable keyspace notification manually, run the config set notify-keyspace-events Ex command. After enabling keyspace notification, we can obtain each keyvalue expiration event. We use this mechanism to realize the function of starting a scheduled task for everyone, and the code is as follows:
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
import utils.JedisUtils;
public class TaskExample {
public static final String _TOPIC = "__keyevent@0__:expired"; // Subscribe to channel name
public static void main(String[] args) {
Jedis jedis = JedisUtils.getJedis();
// Perform scheduled tasks
doTask(jedis);
}
/** * Subscribe expiration messages to perform scheduled tasks *@paramJedis Redis client */
public static void doTask(Jedis jedis) {
// Subscribe to expiration messages
jedis.psubscribe(new JedisPubSub() {
@Override
public void onPMessage(String pattern, String channel, String message) {
// The message is received and the scheduled task is executed
System.out.println("Received a message:"+ message); } }, _TOPIC); }}Copy the code
4.Net TY Implements deferred tasks
Netty is a Java open source framework provided by JBOSS. It is a client and server programming framework based on NIO. Using Netty, you can quickly and easily develop a network application, such as a client and server application that implements a certain protocol. Netty simplifies and streamlines the programming and development of network applications, such as TCP and UDP-based socket service development.
You can use Netty’s HashedWheelTimer utility class to implement deferred tasks. The implementation code is as follows.
First add a Netty reference to the project as follows:
<! -- https://mvnrepository.com/artifact/io.netty/netty-common -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
<version>4.1.48. The Final</version>
</dependency>
Copy the code
The complete code for Netty implementation is as follows:
public class DelayTaskExample {
public static void main(String[] args) {
System.out.println("Program start time:" + LocalDateTime.now());
NettyTask();
}
/** * Netty based delay task */
private static void NettyTask(a) {
// Create a delayed task instance
HashedWheelTimer timer = new HashedWheelTimer(3.// Time interval
TimeUnit.SECONDS,
100); // The number of slots in the time wheel
// Create a task
TimerTask task = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
System.out.println("On a mission" +
", execution time:+ LocalDateTime.now()); }};// Add the task to the delay queue
timer.newTimeout(task, 0, TimeUnit.SECONDS); }}Copy the code
The results of the above procedures are as follows:
Program start time: 2020-04-13T10:16:23.033
Task execution, execution time: 2020-04-13T10:16:26.118
HashedWheelTimer is implemented by using the timing wheel. The timing wheel is actually a loop-shaped data structure, which can be thought of as a clock, divided into many cells, each representing a certain time. On this cell, a linked list is used to store the timeout task to be executed, and a pointer moves one by one. The delay task corresponding to the grid is performed when the grid is reached, as shown below:
The above picture can be understood as a time wheel with a size of 8, which rotates one grid at a certain time (for example, 1s), and each grid points to a linked list of tasks to be performed.
5.MQ implements deferred tasks
It would be a bit of a dead end luxury to have an MQ middleware dedicated to executing deferred tasks, but it is still desirable if you already have an MQ environment.
Almost all MQ middleware can implement deferred tasks, which would be more accurately called deferred queues. This article uses RabbitMQ as an example to see how it can implement deferred tasks.
RabbitMQ implements delay queuing in two ways:
- After the message expires, it enters the dead letter exchange and is forwarded to the delay consumption queue by the exchange to realize the delay function.
- Use the Rabbitmq-delayed -message-exchange plug-in for delay.
Note: The rabbitmq-delayed-message-exchange plugin is supported in RabbitMQ 3.5.7 and above and depends on Erlang/OPT 18.0 and above.
The second implementation of the rabbitmq-delayed-message-exchange plugin is recommended because of the complexity of using a dead letter exchange.
First, we need to download and install the rabbitmq – of – the message – exchange plug-in, download address: www.rabbitmq.com/community-p…
Install rabbitmq-plugins to rabbitmq_delayed_message_exchange. Install rabbitmq-plugins to rabbitmq_delayed_message_exchange. The rabbitmq-plugins list command is used to query all installed plug-ins. The following figure shows the successful installation:
Finally, restart the RabbitMQ service for the plug-in to take effect.
First, we need to configure the message queue. The code is as follows:
import com.example.rabbitmq.mq.DirectConfig;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DelayedConfig {
final static String QUEUE_NAME = "delayed.goods.order";
final static String EXCHANGE_NAME = "delayedec";
@Bean
public Queue queue(a) {
return new Queue(DelayedConfig.QUEUE_NAME);
}
// Configure the default switch
@Bean
CustomExchange customExchange(a) {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type"."direct");
// Parameter two is of type: it must be X-delayed -message
return new CustomExchange(DelayedConfig.EXCHANGE_NAME, "x-delayed-message".true.false, args);
}
// Bind the queue to the switch
@Bean
Binding binding(Queue queue, CustomExchange exchange) {
returnBindingBuilder.bind(queue).to(exchange).with(DelayedConfig.QUEUE_NAME).noargs(); }}Copy the code
Then add the code to add the message as follows:
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
@Component
public class DelayedSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(String msg) {
SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("Delivery time:" + sf.format(new Date()));
rabbitTemplate.convertAndSend(DelayedConfig.EXCHANGE_NAME, DelayedConfig.QUEUE_NAME, msg, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setHeader("x-delay".3000);
returnmessage; }}); }}Copy the code
Add the code to consume the message:
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
@Component
@RabbitListener(queues = "delayed.goods.order")
public class DelayedReceiver {
@RabbitHandler
public void process(String msg) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("Received time :" + sdf.format(new Date()));
System.out.println("Message content:"+ msg); }}Copy the code
Finally, let’s test this with code:
import com.example.rabbitmq.RabbitmqApplication;
import com.example.rabbitmq.mq.delayed.DelayedSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.text.SimpleDateFormat;
import java.util.Date;
@RunWith(SpringRunner.class)
@SpringBootTest
public class DelayedTest {
@Autowired
private DelayedSender sender;
@Test
public void Test(a) throws InterruptedException {
SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd");
sender.send("Hi Admin.");
Thread.sleep(5 * 1000); // Wait for the receiving program to execute before exiting the test}}Copy the code
The execution results of the above procedures are as follows:
Delivery time: 2020-04-13 20:47:51
Receiving time :2020-04-13 20:47:54
Message content: Hi Admin.
It can be seen from the results that the above program execution conforms to the implementation expectation of the delayed task.
6. Use Spring to schedule tasks
This article uses the SpringBoot project to demonstrate the implementation of Scheduled changes. To implement this, you need to declare the following configuration changes to be on:
@SpringBootApplication
@EnableScheduling
public class Application {
public static void main(String[] args) { SpringApplication.run(Application.class, args); }}Copy the code
Then add the delay task, the implementation code is as follows:
@Component
public class ScheduleJobs {
@Scheduled(fixedDelay = 2 * 1000)
public void fixedDelayJob(a) throws InterruptedException {
System.out.println(Task execution, time:+ LocalDateTime.now()); }}Copy the code
When we start the project, we can see that the task has been looping for 2 seconds, with the result as follows:
Mission execution, time: 2020-04-13T14:07:53.349
Task execution, time: 2020-04-13T14:07:55.350
Task execution, time: 2020-04-13T14:07:57.351
.
We can also use Corn expressions to define how often tasks should be executed, for example using @scheduled (cron = “0/4 * * * *?” ).
7.Quartz implements deferred tasks
Quartz is a powerful task scheduler for complex scheduling and distributed task scheduling.
We use Quartz to implement a deferred task, first defining an execution task code as follows:
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.scheduling.quartz.QuartzJobBean;
import java.time.LocalDateTime;
public class SampleJob extends QuartzJobBean {
@Override
protected void executeInternal(JobExecutionContext jobExecutionContext)
throws JobExecutionException {
System.out.println(Task execution, time:+ LocalDateTime.now()); }}Copy the code
In defining a JobDetail and Trigger implementation code is as follows:
import org.quartz.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class SampleScheduler {
@Bean
public JobDetail sampleJobDetail(a) {
return JobBuilder.newJob(SampleJob.class).withIdentity("sampleJob")
.storeDurably().build();
}
@Bean
public Trigger sampleJobTrigger(a) {
// Execute after 3s
SimpleScheduleBuilder scheduleBuilder =
SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(3).withRepeatCount(1);
return TriggerBuilder.newTrigger().forJob(sampleJobDetail()).withIdentity("sampleTrigger") .withSchedule(scheduleBuilder).build(); }}Copy the code
Finally, after the SpringBoot project is started, the implementation code is as follows:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
/** * Execute */ after the SpringBoot project starts
public class MyStartupRunner implements CommandLineRunner {
@Autowired
private SchedulerFactoryBean schedulerFactoryBean;
@Autowired
private SampleScheduler sampleScheduler;
@Override
public void run(String... args) throws Exception {
// Start a scheduled taskschedulerFactoryBean.getScheduler().scheduleJob( sampleScheduler.sampleJobTrigger()); }}Copy the code
The execution results of the above procedures are as follows:
The 2020-04-13 19:02:12. 17768-331 the INFO [restartedMain] com. Example. Demo. DemoApplication: Started DemoApplication in 1.815 seconds (JVM running for 3.088)
Task execution, time: 2020-04-13T19:02:15.019
It can be seen from the results that the delayed task was executed 3s after the project started.
conclusion
This article describes the application scenarios of delayed tasks, and 10 ways to implement delayed tasks:
- Manual wireless loop;
- ScheduledExecutorService;
- DelayQueue;
- Redis zset data judgment method;
- Redis key space notification method;
- HashedWheelTimer utility class provided by Netty;
- RabbitMQ dead letter queue;
- RabbitMQ delayed message plugin rabbitmq-delayed message-exchange;
- Spring Scheduled;
- Quartz.
The last word
As the saying goes: one minute on stage, ten years off stage. All the contents of this article are the crystallization accumulated by the author for many years of work, as well as the arrangement of staying up late and painstaking. If you feel that this article is helpful to you, please give a thumbs up before you go (your support is my motivation for continuous progress), thank you.
For more exciting content, please follow the wechat public account “Java Chinese Community”.