What is a deferred task?
Oddly enough, we call tasks that need to be deferred a deferred task.
Deferred tasks can be used in the following scenarios:
- Red envelope is not checked for 24 hours, need to delay the return business;
- On the bill day of each month, the statement of the current month needs to be sent to the user.
- 30 minutes after the order is placed, the system automatically cancels the order if the user does not pay for it.
And other events require the use of deferred tasks.
Analysis of delayed task implementation
The key to delayed task implementation is to execute a task at a certain point in time. Based on this information, we can think of the following two ways to achieve delayed tasks:
- His handwriting a “loop” has been judging the current time node has to execute the task;
- Use JDK or third-party utility classes to implement deferred tasks.
The key words we can think of by JDK to achieve delayed task are: DelayQueue, ScheduledExecutorService, and the third party to provide delayed task execution methods have a lot of, such as: Redis, Netty, MQ and other means.
Delayed task implementation
Let’s look at the implementation of each deferred task in combination with the code below.
1. An infinite loop to implement delayed tasks
In this way, we need to open an infinite loop of scanning tasks, and then use a Map set to store tasks and delay execution time, the implementation code is as follows:
import java.time.Instant; import java.time.LocalDateTime; import java.util.HashMap; import java.util.Iterator; import java.util.Map; Private static Map<String, private static Map<String, private static Map<String, Long> _TaskMap = new HashMap<>(); Public static void main(String[] args) {system.out.println (" localDatetime.now () "); // Add a scheduled task _taskmap. put("task-1", instant.now ().plusseconds (3).toepochmilli ()); // delay 3s // call infinite loop implementation delay task loopTask(); } public static void loopTask() {Long itemLong = 0L; while (true) { Iterator it = _TaskMap.entrySet().iterator(); while (it.hasNext()) { Map.Entry entry = (Map.Entry) it.next(); itemLong = (Long) entry.getValue(); If (instant.now ().toepochmilli () >= itemLong) {system.out.println (" Execute task: "+ entry.getKey() +", execution time: "+ localDatetime.now ()); Remove (entry.getKey()); // Delete task _taskmap.remove (entry.getkey ()); } } } } }Copy the code
The results of the above procedures are as follows:
Program start time: 2020-04-12T18:51:28.188
Execution task: Task-1, execution time: 2020-04-12T18:51:31.189
It can be seen that the task is delayed by 3s, which is in line with our expectations.
2.Java API implementation of deferred tasks
The Java API provides two methods for implementing deferred tasks: DelayQueue and ScheduledExecutorService.
ScheduledExecutorService Implements delayed tasks
ScheduledExecutorService (ScheduledExecutorService) can be used to execute tasks at a fixed frequency, as follows:
Public class DelayTaskExample {public static void main(String[] args) {system.out.println ( " + LocalDateTime.now()); scheduledExecutorServiceTask(); } / * * * ScheduledExecutorService realize fixed frequency has been circulating a mission * / public static void scheduledExecutorServiceTask () { ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); Executor. ScheduleWithFixedDelay (new Runnable () {@ Override public void the run () {/ / mission of the business code System. Out. The println (" mission "+ + localDatetime.now ()); }}, 2, // first execution interval 2, // second execution timeunit.seconds); }}Copy the code
The results of the above procedures are as follows:
Program start time: 2020-04-12T21:28:10.416
Execution task, execution time: 2020-04-12T21:28:12.421
Execution task, execution time: 2020-04-12T21:28:14.422
.
ScheduledExecutorService#scheduleWithFixedDelay(…) Method, the deferred task is executed in a continuous loop at a certain frequency.
② DelayQueue implements deferred tasks
DelayQueue is an unbounded blocking queue that supports Delayed acquisition elements that must implement the Delayed interface and rewrite the getDelay(TimeUnit) and compareTo(Delayed) methods, DelayQueue the complete code for DelayQueue is as follows:
public class DelayTest { public static void main(String[] args) throws InterruptedException { DelayQueue delayQueue = new DelayQueue(); // Add delay task delayQueue.put(new DelayElement(1000)); delayQueue.put(new DelayElement(3000)); delayQueue.put(new DelayElement(5000)); System. The out. Println (" start time: "+ DateFormat. GetDateTimeInstance (). The format (new Date ())); while (! Delayqueue.isempty ()){system.out.println (delayqueue.take ()); } System. Out. Println (" end time: "+ DateFormat. GetDateTimeInstance (). The format (new Date ())); } static class DelayElement implements Delayed {// Long delayTime = system.currentTimemillis (); static class DelayElement implements Delayed {// Long delayTime = system.currentTimemillis (); public DelayElement(long delayTime) { this.delayTime = (this.delayTime + delayTime); Public long getDelay(TimeUnit unit) {return unit.convert(delaytime-system.currentTimemillis (), TimeUnit.MILLISECONDS); } public int compareTo(Delayed) {if (this.getdelay (timeunit.milliseconds) > o.getDelay(TimeUnit.MILLISECONDS)) { return 1; } else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) { return -1; } else { return 0; } } @Override public String toString() { return DateFormat.getDateTimeInstance().format(new Date(delayTime)); }}}Copy the code
The results of the above procedures are as follows:
Start time: 2020-4-12 20:40:38
The 2020-4-12 20:40:39
The 2020-4-12 20:40:41
The 2020-4-12 20:40:43
End time: 2020-4-12 20:40:43
3.Redis implements deferred tasks
There are two ways to implement delayed tasks using Redis: zset data judgment and key space notification.
① The way to judge by data
We use zSET data type to store delayed tasks in this data set, and then consume all tasks that start a wireless loop to query the current time. The implementation code is as follows (Jedis framework is required) :
import redis.clients.jedis.Jedis; import utils.JedisUtils; import java.time.Instant; import java.util.Set; public class DelayQueueExample { // zset key private static final String _KEY = "myDelayQueue"; public static void main(String[] args) throws InterruptedException { Jedis jedis = JedisUtils.getJedis(); // Delay 30s (30s) long delayTime = instant.now ().plusseconds (30).getepochSecond (); jedis.zadd(_KEY, delayTime, "order_1"); Jedis.zadd (_KEY, instant.now ().plusseconds (2).getepochSecond (), "order_2"); jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_3"); jedis.zadd(_KEY, Instant.now().plusSeconds(7).getEpochSecond(), "order_4"); jedis.zadd(_KEY, Instant.now().plusSeconds(10).getEpochSecond(), "order_5"); // Enable doDelayQueue(jedis); @param jedis Redis */ public static void doDelayQueue(jedis jedis) throws InterruptedException { While (true) {// Instant nowInstant = Instant. Now (); long lastSecond = nowInstant.plusSeconds(-1).getEpochSecond(); / / on a second time long nowSecond = nowInstant. GetEpochSecond (); Set<String> data = jedis.zrangebyScore (_KEY, lastSecond, nowSecond); For (String item: data) {system.out.println (" item: "+ item); } // Delete executed task jedis.zremrangebyScore (_KEY, lastSecond, nowSecond); Thread.sleep(1000); // Poll once per second}}}Copy the code
② Notification via key space
By default, keyspace notification is not enabled on Redis server. To enable keyspace notification manually, run the config set notify-keyspace-events Ex command. After enabling keyspace notification, we can obtain each keyvalue expiration event. We use this mechanism to realize the function of starting a scheduled task for everyone, and the code is as follows:
import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPubSub; import utils.JedisUtils; public class TaskExample { public static final String _TOPIC = "__keyevent@0__:expired"; Public static void main(String[] args) {Jedis Jedis = jedisutils.getJedis (); // Execute the scheduled task doTask(jedis); } /** * Subscribe to expiration messages, Public static void doTask(jedis jedis) {// Subscribe jedis.psubscribe(new JedisPubSub()) {@override public void onPMessage(String pattern, String channel, String message) { Println (" Received message: "+ message); } }, _TOPIC); }}Copy the code
4.Net TY Implements deferred tasks
Netty is a Java open source framework provided by JBOSS. It is a client and server programming framework based on NIO. Using Netty, you can quickly and easily develop a network application, such as a client and server application that implements a certain protocol. Netty simplifies and streamlines the programming and development of network applications, such as TCP and UDP-based socket service development.
You can use Netty’s HashedWheelTimer utility class to implement deferred tasks. The implementation code is as follows.
First add a Netty reference to the project as follows:
<! -- https://mvnrepository.com/artifact/io.netty/netty-common --> <dependency> <groupId>io.netty</groupId> < artifactId > netty - common < / artifactId > < version > 4.1.48. The Final < / version > < / dependency >Copy the code
The complete code for Netty implementation is as follows:
Public class DelayTaskExample {public static void main(String[] args) {system.out.println ( " + LocalDateTime.now()); NettyTask(); } /** * Private static void NettyTask() {HashedWheelTimer timer = new HashedWheelTimer(3, // Time interval timeunit.seconds, 100); TimerTask task = new TimerTask() {@override public void run(Timeout Timeout) throws Exception { System.out.println(" execute task "+", execute time: "+ localDatetime.now ()); }}; NewTimeout (task, 0, timeunit.seconds); // Add the task to the delay queue. }}Copy the code
The results of the above procedures are as follows:
Program start time: 2020-04-13T10:16:23.033
Task execution, execution time: 2020-04-13T10:16:26.118
HashedWheelTimer is implemented by using the timing wheel. The timing wheel is actually a loop-shaped data structure, which can be thought of as a clock, divided into many cells, each representing a certain time. On this cell, a linked list is used to store the timeout task to be executed, and a pointer moves one by one. The delay task corresponding to the grid is performed when the grid is reached, as shown below:
The above picture can be understood as a time wheel with a size of 8, which rotates one grid at a certain time (for example, 1s), and each grid points to a linked list of tasks to be performed.
5.MQ implements deferred tasks
It would be a bit of a dead end luxury to have an MQ middleware dedicated to executing deferred tasks, but it is still desirable if you already have an MQ environment.
Almost all MQ middleware can implement deferred tasks, which would be more accurately called deferred queues. This article uses RabbitMQ as an example to see how it can implement deferred tasks.
RabbitMQ implements delay queuing in two ways:
- After the message expires, it enters the dead letter exchange and is forwarded to the delay consumption queue by the exchange to realize the delay function.
- Use the Rabbitmq-delayed -message-exchange plug-in for delay.
Note: The rabbitmq-delayed-message-exchange plugin is supported in RabbitMQ 3.5.7 and above and depends on Erlang/OPT 18.0 and above.
The second implementation of the rabbitmq-delayed-message-exchange plugin is recommended because of the complexity of using a dead letter exchange.
First, we need to download and install the rabbitmq – of – the message – exchange plug-in, download address: www.rabbitmq.com/community-p…
Install rabbitmq-plugins to rabbitmq_delayed_message_exchange. Install rabbitmq-plugins to rabbitmq_delayed_message_exchange. The rabbitmq-plugins list command is used to query all installed plug-ins. The following figure shows the successful installation:
Finally, restart the RabbitMQ service for the plug-in to take effect.
First, we need to configure the message queue. The code is as follows:
import com.example.rabbitmq.mq.DirectConfig; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class DelayedConfig { final static String QUEUE_NAME = "delayed.goods.order"; final static String EXCHANGE_NAME = "delayedec"; @Bean public Queue queue() { return new Queue(DelayedConfig.QUEUE_NAME); } @bean CustomExchange CustomExchange () {Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); // The second parameter is type: Must be X-delayed -message return new CustomExchange(delayedconfig.exchange_name, "X-delayed -message", true, false, args); } @bean Binding Binding (Queue Queue, CustomExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DelayedConfig.QUEUE_NAME).noargs(); }}Copy the code
Then add the code to add the message as follows:
import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.text.SimpleDateFormat; import java.util.Date; @Component public class DelayedSender { @Autowired private AmqpTemplate rabbitTemplate; public void send(String msg) { SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println(" send time: "+ sf.format(new Date())); rabbitTemplate.convertAndSend(DelayedConfig.EXCHANGE_NAME, DelayedConfig.QUEUE_NAME, msg, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setHeader("x-delay", 3000); return message; }}); }}Copy the code
Add the code to consume the message:
import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.text.SimpleDateFormat; import java.util.Date; @Component @RabbitListener(queues = "delayed.goods.order") public class DelayedReceiver { @RabbitHandler public void process(String msg) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println(" format(new Date()) "); System.out.println(" message content: "+ MSG); }}Copy the code
Finally, let’s test this with code:
import com.example.rabbitmq.RabbitmqApplication; import com.example.rabbitmq.mq.delayed.DelayedSender; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.text.SimpleDateFormat; import java.util.Date; @RunWith(SpringRunner.class) @SpringBootTest public class DelayedTest { @Autowired private DelayedSender sender; @Test public void Test() throws InterruptedException { SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd"); sender.send("Hi Admin."); Thread.sleep(5 * 1000); // Wait for the receiver to execute before exiting the test}}Copy the code
The execution results of the above procedures are as follows:
Delivery time: 2020-04-13 20:47:51
Receiving time :2020-04-13 20:47:54
Message content: Hi Admin.
It can be seen from the results that the above program execution conforms to the implementation expectation of the delayed task.
6. Use Spring to schedule tasks
This article uses the SpringBoot project to demonstrate the implementation of Scheduled changes. To implement this, you need to declare the following configuration changes to be on:
@SpringBootApplication @EnableScheduling public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); }}Copy the code
Then add the delay task, the implementation code is as follows:
@Component public class ScheduleJobs { @Scheduled(fixedDelay = 2 * 1000) public void fixedDelayJob() throws InterruptedException {system.out.println (" Task execution, time: "+ localDatetime.now ()); }}Copy the code
When we start the project, we can see that the task has been looping for 2 seconds, with the result as follows:
Mission execution, time: 2020-04-13T14:07:53.349
Task execution, time: 2020-04-13T14:07:55.350
Task execution, time: 2020-04-13T14:07:57.351
.
We can also use Corn expressions to define how often tasks should be executed, for example using @scheduled (cron = “0/4 * * * *?” ).
7.Quartz implements deferred tasks
Quartz is a powerful task scheduler for complex scheduling and distributed task scheduling.
We use Quartz to implement a deferred task, first defining an execution task code as follows:
import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.springframework.scheduling.quartz.QuartzJobBean; import java.time.LocalDateTime; public class SampleJob extends QuartzJobBean { @Override protected void executeInternal(JobExecutionContext JobExecutionContext) throws JobExecutionException {system.out.println (" Task execution, time: "+ localDateTime.now ()); }}Copy the code
In defining a JobDetail and Trigger implementation code is as follows:
import org.quartz.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class SampleScheduler { @Bean public JobDetail sampleJobDetail() { return JobBuilder.newJob(SampleJob.class).withIdentity("sampleJob") .storeDurably().build(); } @bean public Trigger sampleJobTrigger() {SimpleScheduleBuilder scheduleBuilder = SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(3).withRepeatCount(1); return TriggerBuilder.newTrigger().forJob(sampleJobDetail()).withIdentity("sampleTrigger") .withSchedule(scheduleBuilder).build(); }}Copy the code
Finally, after the SpringBoot project is started, the implementation code is as follows:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
/**
* SpringBoot 项目启动后执行
*/
public class MyStartupRunner implements CommandLineRunner {
@Autowired
private SchedulerFactoryBean schedulerFactoryBean;
@Autowired
private SampleScheduler sampleScheduler;
@Override
public void run(String... args) throws Exception {
// 启动定时任务
schedulerFactoryBean.getScheduler().scheduleJob(
sampleScheduler.sampleJobTrigger());
}
}
Copy the code
The execution results of the above procedures are as follows:
The 2020-04-13 19:02:12. 17768-331 the INFO [restartedMain] com. Example. Demo. DemoApplication: Started DemoApplication in 1.815 seconds (JVM running for 3.088)
Task execution, time: 2020-04-13T19:02:15.019
It can be seen from the results that the delayed task was executed 3s after the project started.
conclusion
This article describes the application scenarios of delayed tasks, and 10 ways to implement delayed tasks:
- Manual wireless loop;
- ScheduledExecutorService;
- DelayQueue;
- Redis zset data judgment method;
- Redis key space notification method;
- HashedWheelTimer utility class provided by Netty;
- RabbitMQ dead letter queue;
- RabbitMQ delayed message plugin rabbitmq-delayed message-exchange;
- Spring Scheduled;
- Quartz.
Write in the last
Welcome to pay attention to my public number [calm as code], massive Java related articles, learning materials will be updated in it, sorting out the data will be placed in it.
If you think it’s written well, click a “like” and add a follow! Point attention, do not get lost, continue to update!!