1. Introduction
In previous articles, we introduced the timed task class Timer, which appeared in JDK 1.3 under the java.util package. Today said ScheduledThreadPoolExecutor is under the JUC package is JDK1.5 added.
Let’s talk about this class today.
2. The API is introduced
The internal structure of this class is somewhat similar to that of a Timer, with three classes:
ScheduledThreadPoolExecutor
: Interface used by programmers.DelayedWorkQueue
: Queues for storing tasks.ScheduledFutureTask
: The thread that executes the task.
Construction method introduction:
/ / using a given core pool size to create a new ScheduledThreadPoolExecutor.
ScheduledThreadPoolExecutor(int corePoolSize)
/ / create a new ScheduledThreadPoolExecutor using a given initial parameters.
ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler)
/ / used to create a new ScheduledThreadPoolExecutor given initial parameters.
ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory)
/ / create a new ScheduledThreadPoolExecutor using a given initial parameters.
ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler)
Copy the code
ScheduledThreadPoolExecutor support at most three parameters: the core number of threads, thread factory, refused to strategy.
Why is there no maximum number of threads? Because internal ScheduledThreadPoolExecutor is an unbounded queue, maximumPoolSize there would be no mean.
To introduce his API methods, please forgive me for copying the JDK documentation, as a memo, as follows:
protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task) // Modify or replace the tasks used to execute the callable.
protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) // Modify or replace the task used to perform runnable.
void execute(Runnable command) // Execute the command with the required zero delay.
boolean getContinueExistingPeriodicTasksAfterShutdownPolicy(a) // gets a policy on whether to continue with existing scheduled tasks if this executor has been shutdown.
boolean getExecuteExistingDelayedTasksAfterShutdownPolicy(a) // gets a policy on whether to continue with existing deferred tasks if this executor has been shutdown.
BlockingQueue<Runnable> getQueue(a) // Returns the task queue used by this executor.
boolean remove(Runnable task) // Remove this task (if it exists) from the executor's internal queue, so that it is no longer running if it has not yet started.
<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) // Create and execute the ScheduledFuture enabled after the given delay.ScheduledFuture<? >schedule(Runnable command, long delay, TimeUnit unit) // Create and perform a one-time operation enabled after a given delay.ScheduledFuture<? >scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) // Create and execute a periodic operation that is enabled for the first time after a given initial delay, with subsequent operations having a given period; That is, execution will begin after initialDelay, then after initialDelay+period, then after initialDelay+ 2 * period, and so on.ScheduledFuture<? >scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) // Create and execute a periodic operation that is first enabled after a given initial delay, followed by a given delay between the end of each execution and the start of the next.
void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) // sets the policy on whether to continue with existing scheduled tasks if this executor has been shutdown.
void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) // sets the policy on whether to continue with existing deferred tasks if this executor has been shutdown.
void shutdown(a) // Initiates an orderly shutdown in the execution of previously committed tasks, but does not accept new tasks.
List<Runnable> shutdownNow(a) // Try to stop all executing tasks, pause processing of waiting tasks, and return to the list of waiting tasks.
<T> Future<T> submit(Callable<T> task) // Submit a task that returns a value for execution, and return a Future that represents the pending outcome of the task.Future<? >submit(Runnable task) // Submit a Runnable task for execution and return a Future representing the task.
<T> Future<T> submit(Runnable task, T result) // Submit a Runnable task for execution and return a Future representing the task.
Copy the code
Some of the most commonly used methods are as follows:
/ / using a given core pool size to create a new ScheduledThreadPoolExecutor.
ScheduledThreadPoolExecutor(int corePoolSize)
// Create and perform a one-time operation enabled after a given delay.ScheduledFuture<? > schedule(Runnable command,long delay, TimeUnit unit)
// Create and execute a periodic operation that is enabled for the first time after a given initial delay, with subsequent operations having a given period; That is, execution will begin after initialDelay, then after initialDelay+period, then after initialDelay+ 2 * period, and so on.ScheduledFuture<? > scheduleAtFixedRate(Runnable command,long initialDelay, long period, TimeUnit unit)
// Create and execute a periodic operation that is first enabled after a given initial delay, followed by a given delay between the end of each execution and the start of the next.ScheduledFuture<? > scheduleWithFixedDelay(Runnable command,long initialDelay, long delay, TimeUnit unit)
Copy the code
In addition to the default constructor, there are three schedule methods. We will analyze their internal implementation.
3. Internal implementation of the construction method
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
Copy the code
The DelayedWorkQueue is what we’re interested in. It is also a blocking queue. The data structure of this queue is the heap. At the same time, this queue is also comparable. Compare what? Tasks must implement the compareTo method, which compares the execution times of tasks and, if the execution times are the same, the join times of tasks.
Therefore, ScheduledFutureTask has two variables:
time
: Indicates the execution time of the task.sequenceNumber
: Time when a task is added.
These two variables are used to compare the order in which tasks are executed. The whole scheduling sequence is this logic.
4. Differences between several schedule methods
As just said, there are three schedule methods:
-
ScheduledFuture
schedule(Runnable Command, long delay, TimeUnit unit) creates and performs one-time operations enabled after a given delay. -
ScheduledFuture
scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) creates and executes a scheduled operation that is enabled for the first time after a given initialDelay, Subsequent operations have a given period; That is, execution will begin after initialDelay, then after initialDelay+period, then after initialDelay+ 2 * period, and so on. -
ScheduledFuture
scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) creates and executes a scheduled operation that is enabled for the first time after a given initialDelay. Then, There is a given delay between the termination of each execution and the beginning of the next.
The first method is executed once after a given time.
What’s interesting is the direct difference between the second method and the third method.
Both methods can be called repeatedly. However, there is a difference in the logic of repeated calls, and this is where it works better than Timer.
What they have in common is that they have to wait for the last task to finish before they can move on to the next.
The difference is that their schedules are roughly different.
The execution period of the scheduleAtFixedRate method is fixed. That is, the execution time of the previous task is used as the starting point and the subsequent period is used to schedule the next task.
The scheduleWithFixedDelay method uses the end time of the previous task as the starting point and the subsequent period to schedule the next task.
What’s the difference?
It makes no difference how short the mission is. However, if the task is executed for a long time beyond the period, then the difference becomes apparent.
Let’s assume.
We set the period to 2 seconds and the task takes 5 seconds.
This is where the difference between the two approaches comes in.
The scheduleAtFixedRate method is executed immediately after the last task completes, with a 5 second interval between the start of the task and the start of the previous task (because the last task must be executed).
The scheduleWithFixedDelay method will run after the last task. Note that ** will wait 2 seconds before executing the last task, so the interval between the last task and the last task is 7 seconds.
So, we need to pay attention to in the process of using ScheduledThreadPoolExecutor task execution time can’t more than time interval, if more than, it is best to use scheduleAtFixedRate methods, prevent tasks pile up.
Of course, it’s also about specific business. One cannot generalize. But it’s important to note the difference between these two approaches.
5. Implementation of scheduled methods
Let’s look at the internal implementation of the scheduleAtFixedRate method.
publicScheduledFuture<? > scheduleAtFixedRate(Runnable command,long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
Copy the code
Create a ScheduledFutureTask and decorate the Future. The implementation of this class returns directly. Subclasses can have their own implementation and decorate a layer around the task.
The delayedExecute method is then executed and the Future is returned.
This ScheduledFutureTask implements a number of interfaces, such as Future, Runnable, Comparable, Delayed, etc.
The ScheduledFutureTask is constructed as follows:
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call(a) {
task.run();
returnresult; }}Copy the code
Step by step, the class first passes the queue number of the task through an atomic static int object, and then creates a Callable, which is an adapter that ADAPTS Runnable and Callable, that is, wraps Runnable as callabe, His call method is to call the run method for a given task. Of course, the result here is useless.
If you are passing a callable, call FutureTask’s run method and set the actual return value.
The adapter pattern is used here, which is interesting.
In general, this ScheduledFutureTask is based on FutureTask, which we introduced earlier from source code.
He rewrote several methods himself: compareTo, getDelay, Run, and isPeriodicity.
We still have to look at the implementation of delayedExecute.
private void delayedExecute(RunnableScheduledFuture
task) {
if (isShutdown())
reject(task);
else {
// Add to queue.
super.getQueue().add(task);
// If the thread pool is closed, the task cannot run in its current state, and the task is successfully removed from the queue, the task is marked as cancelled.
// The second judgment is controlled by two variables (default values are as follows) :
/ / continueExistingPeriodicTasksAfterShutdown = false close it says periodic task should be canceled. Off by default
/ / executeExistingDelayedTasksAfterShutdown = true. Indicates that non-periodic tasks should be cancelled when closed. It is not disabled by default.
// In running state, canRunInCurrentRunState must return true.
// In the non-RUNNING state, canRunInCurrentRunState is returned based on the above two values.
if(isShutdown() && ! canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false);
else
// Start the taskensurePrestart(); }}Copy the code
Talk about the above method.
- Check whether the task is disabled. If the task is disabled, the task is rejected.
- If it’s not closed, it adds to the queue, and in the order that we talked about before, according to
ScheduledFutureTask
的compareTo
Method, first compare the execution time, then compare the order of addition. - If the thread pool is closed during this process, then determine whether the task should be canceled based on two variables, as noted in the comment. The default policy is to cancel tasks if they are periodic and not otherwise.
- If the thread pool is not closed. The thread in the thread pool is called to execute the task.
The overall process is shown as follows:
Note in the figure above that if it is a periodic task, it will be returned to the queue after completion of execution.
Where do you see that?
The ScheduledFutureTask run method:
public void run(a) {
// Whether it is a periodic task
boolean periodic = isPeriodic();
// If the task cannot be CANCELLED at its current state, cancel it (set the status of the task to CANCELLED).
if(! canRunInCurrentRunState(periodic)) cancel(false);
// If it is not a periodic task, call the FutureTask # run method
else if(! periodic) ScheduledFutureTask.super.run();
// If it is periodic.
// Execute the task without setting a return value, return true on success. (Callable cannot be repeated)
else if (ScheduledFutureTask.super.runAndReset()) {
// Set the next execution time
setNextRunTime();
// Add the task to the queue againreExecutePeriodic(outerTask); }}Copy the code
The logic is as follows:
- If it can no longer run in its current state, cancel the task.
- If it’s not a periodic task, do it
FutureTask
的run
Methods. - If the task is periodic, it needs to be executed
runAndReset
Methods. - When the execution is complete, override sets the next execution time of the current task and adds it to the queue.
Execution and management of the entire process is the parent class ScheduledThreadPoolExecutor ThreadPoolExecutor runWorker method. Here, the method pulls data from the queue, which calls the queue’s take method.
If the leader is not empty, the current thread is blocked. If the leader is empty, the first element of the queue has been updated, and the current thread is set to leader.
This is a leader-follower model, says Doug Lea.
Of course, the overall logic of the take method remains the same. Fetch data from the head of the queue. Condition is used for coordination between threads.
5. To summarize
About ScheduledThreadPoolExecutor scheduling classes, we analysis about, summarize.
ScheduledThreadPoolExecutor is a regular task thread pool, similar to the Timer, but stronger than the Timer, robust.
For example, unlike Timer, the task is abnormal and the whole scheduling system is completely useless.
It also has more Rate modes (Rate and Delay) than Timer.
The difference between the two modes is that the starting time of task execution is different. Rate is calculated from the starting time of the last task. Delay is calculated from the end time of the previous task.
Therefore, if the time of the task itself exceeds the interval, then the interval time of the two modes will be inconsistent.
Tasks are sorted through the compareTo method of ScheduledFutureTask. The rule is to compare the execution time first, and then compare the joining time if the time is the same.
Note also that if an exception occurs during the task execution, it will not be repeated. Because ScheduledFutureTask’s run method doesn’t do catch. Therefore, the programmer needs to handle it manually, which is much better than that the Timer exception directly costs the scheduling system.
ScheduledThreadPoolExecutor is implemented based on ThreadPoolExecutor, most of the functions is to reuse the parent class, only after the implementation, and to set a time, and the task is again in the queue, formed the timer task.