In Java asynchronous periodic task scheduling has Timer, ScheduledThreadPoolExecutor implementation, such as the stand-alone version of time scheduling is used ScheduledThreadPoolExecutor to realize, so it is how to realize the cycle mission? In fact, it uses the ThreadPoolExecutor thread pool to execute tasks, which can be seen from the fact that it inherits the ThreadPoolExecutor rescue. The key is how to implement periodic scheduling of tasks.

ScheduledThreadPoolExecutor class, and the core function

First ScheduledThreadPoolExecutor is to realize the ScheduledExecutorService interface, it mainly defines the four methods:

  • Periodically schedules a Runnable object
  • Periodically schedules a Callable object
  • Schedule Runnable objects at fixed intervals (always execute at fixed delay time when the last Runnable execution started + delay time = the next Runnable execution point, regardless of when the last Runnable execution ended)
  • Schedule unnable objects with fixed delay (after last Runnable execution + fixed delay = next Runnable execution point)
public interface ScheduledExecutorService extends ExecutorService { public ScheduledFuture<? > schedule(Runnable command, long delay, TimeUnit unit); public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit); public ScheduledFuture<? > scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); public ScheduledFuture<? > scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit); }Copy the code

Second, ScheduledThreadPoolExecutor ThreadPoolExecutor is inheritance, so it is the ability to perform a task with the aid of the thread pool, then itself to realize periodic scheduling. The constructor calls the parent’s thread pool constructor from the constructor. The core number of threads is passed in by the constructor. Here you can see that the maximum number of threads is the Integer maximum, which is 2147483647.

/**
 * Creates a new {@code ScheduledThreadPoolExecutor} with the
 * given core pool size.
 *
 * @param corePoolSize the number of threads to keep in the pool, even
 *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
 * @throws IllegalArgumentException if {@code corePoolSize < 0}
 */
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}
Copy the code

ScheduleAtFixedRate is a method to implement periodic scheduling, scheduling tasks is to implement Runnable objects, and the system start delay time, periodic scheduling interval time.

  1. Calculate the initial trigger time and execution cycle, encapsulate the ScheduledFutureTask with the passed Runnable object as a parameter, and then call the decorateTask decorates Tas(default implementation is null).

2. Set the ScheduledFutureTask object outerTask to T (the default is itself). 3. Call delayedExecute to delay the task execution.

public ScheduledFuture<? > scheduleAtFixedRate(Runnable command, long initialDelay, long period, ** TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(init ialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }Copy the code
  1. Determines the thread pool state and rejects the task if it is not in the running state.
  2. Add the task to the parent’s delay queue (actually the initialized DelayedWorkQueue object)
  3. Again to judge the thread pool is not in the running state, and determine whether in the shutdown state and continueExistingPeriodicTasksAfterShutdown logo was true (the default is false, If true, the task will be removed from the queue. If false, the thread will be started to execute the periodic task
private void delayedExecute(RunnableScheduledFuture<? > task) { if (isShutdown()) reject(task); else { super.getQueue().add(task); if (isShutdown() && ! canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else ensurePrestart(); }}Copy the code
  1. Gets the number of thread pools
  2. If the number of threads is smaller than the number of core threads, the core thread is started to execute the task; if the number of threads is empty, the non-core thread is started
void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    if (wc < corePoolSize)
        addWorker(null, true);
    else if (wc == 0)
        addWorker(null, false);
}
Copy the code

ScheduledFutureTask’s run function

  1. Gets whether the task is periodic
  2. Checks whether the thread pool state can execute the task, and if true, cancels the task

4. If the task is a periodic task, the runAndReset function of FutureTask is called. If the function returns true, setNextRunTime is called to set the next running time. Also, reExecutePeriodic performs periodic tasks again.

public void run() { boolean periodic = isPeriodic(); if (! canRunInCurrentRunState(periodic)) cancel(false); else if (! periodic) ScheduledFutureTask.super.run(); else if (ScheduledFutureTask.super.runAndReset()) { setNextRunTime(); reExecutePeriodic(outerTask); }}Copy the code
  1. Check whether the thread pool is in the state of executable tasks. If true, the task with the next running time will be added to the waiting queue of the parent class.
  2. If the thread pool is in an unrunnable task state and has been successfully removed from the wait queue,

Call the cancellation action of the task, otherwise ensurePrestart is called to ensure that the starting thread executes the task

void reExecutePeriodic(RunnableScheduledFuture<? > task) { if (canRunInCurrentRunState(true)) { super.getQueue().add(task); if (! canRunInCurrentRunState(true) && remove(task)) task.cancel(false); else ensurePrestart(); }}Copy the code

DelayedWorkQueue core function

DelayedWorkQueue inherits AbstractQueue and implements the BlockingQueue interface

static class DelayedWorkQueue extends AbstractQueue<Runnable>
    implements BlockingQueue<Runnable> {
Copy the code

Core fields

Private static final int INITIAL_CAPACITY = 16; Private RunnableScheduledFuture<? >[] queue = new RunnableScheduledFuture<? >[INITIAL_CAPACITY]; Private final ReentrantLock Lock = new ReentrantLock(); Private int size = 0; private int size = 0; // The leader thread, which represents the most recent task that needs to be executed. private Thread leader = null; Private final Condition available = lock. NewCondition ();Copy the code

Offer the function:

  1. Convert the added parameters to a RunnableScheduledFuture object.
  2. Add global lock.
  3. Get the size of the current queue, if equal to the size of the queue, then grow the array by 50%.
  4. The size 1.
  5. If the array is 0, place the added object at index 0, and then set the index of ScheduledFutureTask’s heapIndex (for quick subsequent deletion).
  6. SiftUp is called to float the heap, in this case the small root heap.
  7. If the first element in the queue is an incoming object, laader is set to NULL
  8. Release the lock
  9. Returns true
public boolean offer(Runnable x) { if (x == null) throw new NullPointerException(); RunnableScheduledFuture<? > e = (RunnableScheduledFuture<? >)x; final ReentrantLock lock = this.lock; lock.lock(); try { int i = size; if (i >= queue.length) grow(); size = i + 1; if (i == 0) { queue[0] = e; setIndex(e, 0); } else { siftUp(i, e); } if (queue[0] == e) { leader = null; available.signal(); } } finally { lock.unlock(); } return true; }Copy the code

If (key.compareTo(e) >= 0); if (key.compareTo(e) >= 0);

private void siftUp(int k, RunnableScheduledFuture<? > key) { while (k > 0) { int parent = (k - 1) >>> 1; RunnableScheduledFuture<? > e = queue[parent]; if (key.compareTo(e) >= 0) break; queue[k] = e; setIndex(e, k); k = parent; } queue[k] = key; setIndex(key, k); }Copy the code

The poll function

  1. lock
  2. Returns the cloud element with index 0 in the queue if it is null or if the execution timestamp of the first element is longer than the current time, otherwise finishPoll is called to return the first element.
  3. Release the lock
public RunnableScheduledFuture<? > poll() { final ReentrantLock lock = this.lock; lock.lock(); try { RunnableScheduledFuture<? > first = queue[0]; if (first == null || first.getDelay(NANOSECONDS) > 0) return null; else return finishPoll(first); } finally { lock.unlock(); }}Copy the code
  1. Reduce the queue size by 1
  2. Gets the last element of the queue in the queue and sets the last element of the queue to NULL
  3. If the last element is not null, sFITDown is called to set the last element to the position of 0, move down, and readjust the small root heap.
  4. The heapIndex of ScheduledFutureTask is -1
private RunnableScheduledFuture<? > finishPoll(RunnableScheduledFuture<? > f) { int s = --size; RunnableScheduledFuture<? > x = queue[s]; queue[s] = null; if (s ! = 0) siftDown(0, x); setIndex(f, -1); return f; }Copy the code

The ScheduledFutureTask compareTo function

ScheduledFutureTask implements compareTo method logic

  1. First, compare whether it is the same object
  2. Compare the size of time, which is the timestamp of the next task to be executed, if it is ScheduledFutureTask, or if it is not

Duration of getDelay

public int compareTo(Delayed other) {
    if (other == this) // compare zero if same object
        return 0;
    if (other instanceof ScheduledFutureTask) {
        ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
        long diff = time - x.time;
        if (diff < 0)
            return -1;
        else if (diff > 0)
            return 1;
        else if (sequenceNumber < x.sequenceNumber)
            return -1;
        else
            return 1;
    }
    long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
    return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
Copy the code

ScheduledThreadPoolExecutor take function is ThreadPoolExecutor get task from the task queue, no task has been waiting for, here is the number of threads in the case of core is less than the number of threads)

  1. Add interruptible locks
  2. Gets the task of the first element in the queue, which has the smallest timestamp
  3. If the first task is empty, wait on the global lock condition lock,
  4. If the first task is not empty, the delay time is obtained. If the delay time is less than 0, it indicates that the first task has reached the time, and the first task is returned.
  5. If the leader thread is not empty, the thread is made to wait on the conditional lock of the global lock
  6. If leader is empty, the current thread that fetched the first task is assigned to the leader variable.
  7. The leader thread is reset to null if the current thread is still equal to the leader thread after waiting delay of nanoseconds on the conditional lock of the global lock
  8. Finally, if the leader is empty and the first task is not empty, the thread waiting for the conditional lock on the global lock is woken up.
  9. Release the global lock.
public RunnableScheduledFuture<? > take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture<? > first = queue[0]; if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return finishPoll(first); first = null; // don't retain ref while waiting if (leader ! = null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] ! = null) available.signal(); lock.unlock(); }}Copy the code

To sum up, each task fetched by the thread pool from the DelayedWorkQueue is the task with the lowest latency. If the task reaches the time limit, the task will be executed. Otherwise, the task will be waited with Conditon’s wait.