Recent new took over the project in the heavy use of ScheduledThreadPoolExecutor class time to perform some task, had no chance to study this class’s source code, this took advantage of the opportunity to read it.

The original address: www.jianshu.com/p/18f4c95ac…

This class is primarily secondary development based on the ThreadPoolExecutor class, so those unfamiliar with Java thread pool execution are advised to check out my previous article. What should you know when an interviewer asks about thread pools?

First, the implementation process

  1. Unlike ThreadPoolExecutor, submitted to the ScheduledThreadPoolExecutor tasks, tasks are packaged into ScheduledFutureTask object to join the queue delay and start a woker thread.

  2. When the tasks submitted by users are added to the delay queue, they are arranged according to the execution time. That is, the tasks in the queue header need to be executed earliest. The Woker thread, on the other hand, picks up the task from the delay queue and starts to execute it if it has reached its execution time. Otherwise, block and wait for the remaining delay time before trying to get the task.

  3. After a task is executed, if the task needs to be executed periodically, it is added to the delay queue again after the next execution time is calculated.

Second, in-depth analysis of the source code

First look at the ScheduledThreadPoolExecutor several constructor of a class:

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory);
    }

    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), handler);
    }

    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory, handler);
    }Copy the code

Note: There are three things to note about the constructors that use super:

  1. Using DelayedWorkQueue as a blocking queue is not open to user customization like the ThreadPoolExecutor class. Behind the queue is the core of ScheduledThreadPoolExecutor class components, introduced in detail.
  2. MaximumPoolSize is not open to the user because the element in DelayedWorkQueue is expanded when it is 16 larger than its initial capacity, meaning that the queue is not full, and maximumPoolSize does not take effect even if it is set.
  3. The worker thread has no reclaim time for the same reason as in point 2, because no reclaim action is triggered. So the thread lifetime here is set to 0.

Again, you need to understand ThreadPoolExecutor first.

Once we have created a scheduling thread pool, we can start submitting tasks. Here is a look at the source code of three commonly used apis:

The first is the Schedule method, which means that the task is triggered when the specified delay time reaches and only executes once.

public ScheduledFuture<? > schedule(Runnablecommand, long delay, TimeUnit unit) {// Check parametersif (command== null || unit == null) throw new NullPointerException(); // This is a nested structure that wraps the user-submitted task as ScheduledFutureTask and then wraps it in a call to decorateTask. This method is left for the user to extend and is empty by default RunnableScheduledFuture<? > t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); // After wrapping the task, we commit delayedExecute(t);return t;
    }Copy the code

Take a closer look at the source code for the submitted task:

private void delayedExecute(RunnableScheduledFuture<? > task) {// If the thread pool is closed, reject the submitted task with a rejection policyif (isShutdown())
            reject(task);
        else{// Unlike ThreadPoolExecutor, super.getQueue().add(task); // If the task cannot be executed in the current state, cancel itif(isShutdown() && ! canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false);
            elseEnsurePrestart (); ensurePrestart(); ensurePrestart(); ensurePrestart(); }}Copy the code

The key point here is super. GetQueue (), add (task) lines of code, ScheduledThreadPoolExecutor class implements a heap-based herself on the internal data structure of delay queue. The add method will eventually fall into the offer method.

Public Boolean offer(Runnable x) {// Check parametersif(x == null) throw new NullPointerException(); RunnableScheduledFuture<? > e = (RunnableScheduledFuture<? >)x; final ReentrantLock lock = this.lock; lock.lock(); Try {// Check the current number of elements. If the number is larger than the queue size, expand the queue size.if(i >= queue.length) grow(); // Number of elements + 1 size = 1; // If there are no elements in the queue, add them directly to the headerif(i == 0) { queue[0] = e; // Record indexsetIndex(e, 0);
                } elseSiftUp (I, e); siftUp(I, e); siftUp(I, e); } // If the new element is the queue head, there are two cases //1. This is the first task submitted by the user //2. The new task is at the head of the queue after heap adjustmentifQueue [0] == e) {queue[0] = leader = null; // Wake up the worker thread available.signal() after adding the element; } } finally { lock.unlock(); }return true;
        }Copy the code

By using the above logic, we successfully added the submitted task to the delay queue. As mentioned above, after joining the task, a Woker thread will be started, which is responsible for fetching and executing the tasks from the delay queue. These are all the same as ThreadPoolExecutor. Let’s look at the source code for retrieving elements from this deferred queue:

public RunnableScheduledFuture<? > take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try {for(;;) RunnableScheduledFuture<? > first = queue[0]; // If the queue is empty, it is blocked to wake up when the element is addedif (first == null)
                        available.await();
                    else{// Calculate the task execution time, the delay is the current time minus the task trigger time long delay = first.getdelay (NANOSECONDS); // If the trigger time is reached, the queue operation is performedif (delay <= 0)
                            returnfinishPoll(first); first = null; // The task has been assigned to another thread, and the current thread can wait to wake upif(leader ! = null) available.await();elseThisThread = thread.currentThread (); leader = thisThread; Try {// The remaining delay of the current thread waiting task available.awaitNanos(delay); } finally {// When does the leader change after the thread wakes up? // Is the time to add tasks aboveif(leader == thisThread) leader = null; }}}}} finally {// Wake up other Woker threads if the queue is not emptyif (leader == null && queue[0] != null)
                    available.signal();
                lock.unlock();
            }
        }Copy the code

Why is there a leader variable to assign tasks to the blocking queue? The reason is to reduce unnecessary waiting time. For example, the first task in the queue will be executed one minute later, then the user will continuously join the Woker thread when submitting new tasks. If the newly submitted tasks are all at the back of the queue, that is to say, the new Woker will take out the first task and wait for the execution delay time. When the task reaches the trigger time, Wakes up a lot of Woker threads, which is obviously unnecessary.

When the task is fetched by the Woker thread, the run method is executed. Since the task is already wrapped as ScheduledFutureTask, let’s look at the run method of this class:

        public void run() { boolean periodic = isPeriodic(); // If the current thread pool does not support the task, cancel itif(! canRunInCurrentRunState(periodic)) cancel(false);
            else if(! Periodic) / / if you don't need periodic execution, is executed directly run method and then the end ScheduledFutureTask. Super. The run ();else if(ScheduledFutureTask. Super. RunAndReset ()) {/ / if need cycle, after the execution of the mission, set up the next execution timesetNextRunTime(); ReExecutePeriodic (outerTask); reExecutePeriodic(outerTask); }}Copy the code

This is the complete execution of the Schedule method.

ScheduledThreadPoolExecutor class of periodic tasks provides two methods scheduleAtFixedRate with scheduleWithFixedDelay, together to see the difference.

public ScheduledFuture<? > scheduleAtFixedRate(Runnablecommand, long initialDelay, long period, TimeUnit unit) { ScheduledFutureTask<Void> SFT = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.tonanos (period)); / /... } public ScheduledFuture<? > scheduleWithFixedDelay(Runnablecommand,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) {
        //...
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), ununit.tonanos (-delay)); / /.. }Copy the code

The ScheduledFutureTask passes the cycle delay time to the ScheduledFutureTask, while the ScheduledFutureTask passes the cycle delay time to the ScheduledFutureTask as a negative number. SetNextRunTime = setNextRunTime = setNextRunTime = setNextRunTime

        private void setNextRunTime() { long p = period; // Greater than 0 is the scheduleAtFixedRate method, which indicates that the execution time is calculated according to the initialization parametersif (p > 0)
                time += p;
            else< 0 is the scheduleWithFixedDelay method, which indicates that the execution time is recalcitated according to the current time = triggerTime(-p); }Copy the code

In other words, when the scheduleAtFixedRate method is used to submit a task, the delay time for subsequent execution of the task has been determined, which are initialDelay, initialDelay + period, InitialDelay + 2 * period and so on. When the scheduleWithFixedDelay method is called to submit a task, the delay time of the first execution is initialDelay, and the subsequent execution time is added to the time point after the completion of the previous task.

Third, summary

ScheduledThreadPoolExecutor is arguably some extensions in ThreadPoolExecutor above operation, it’s just a repackaging the task and the blocking queue. The DelayedWorkQueue of this class is implemented based on the heap. This article does not explain in detail how to adjust the heap structure to insert and delete data. Interested students can exchange messages or comments.