ThreadPoolExecutor source code parsing

Overall Architecture

Core parameter

  • CorePoolSize: The size of the core pool. This parameter has a lot to do with how thread pools are implemented. After a thread pool is created, by default, there are no threads in the pool, but instead wait for a task to arrive before creating a thread to execute the task, unless the prestartAllCoreThreads() or prestartCoreThread() methods are called. As the names of these two methods suggest, The corePoolSize thread or one thread is created before the task arrives. By default, after a thread pool is created, the number of threads in the thread pool is zero. When a task arrives, a thread is created to execute the task. When the number of threads in the thread pool reaches corePoolSize, the incoming task is placed in the cache queue.
  • MaximumPoolSize: the maximum number of threads in the thread pool. This is also an important parameter, indicating the maximum number of threads that can be created in the thread pool.
  • KeepAliveTime: indicates the maximum length of time a thread can hold without executing a task before terminating. By default, keepAliveTime works only when the number of threads in the thread pool is greater than corePoolSize, until the number of threads in the thread pool is no greater than corePoolSize. If a thread is idle for a keepAliveTime, it terminates until the number of threads in the thread pool does not exceed corePoolSize. But if the allowCoreThreadTimeOut(Boolean) method is called, the keepAliveTime parameter will also work until the number of threads in the pool is zero if the number of threads in the pool is not greater than corePoolSize.
  • Unit: Indicates the unit of keepAliveTime
  • WorkQueue: A blocking queue used to store tasks waiting to be executed. The choice of this parameter is also important and can have a significant impact on the running of the thread pool
    • ArrayBlockingQueue
    • LinkedBlockingQueue
    • SynchronousQueue
  • Handler: indicates the policy for rejecting tasks. The options are as follows
    • ThreadPoolExecutor. AbortPolicy: discard task and throw RejectedExecutionException anomalies
    • ThreadPoolExecutor. DiscardPolicy: discard task too, but I don’t throw an exception
    • ThreadPoolExecutor. DiscardOldestPolicy: discard queue in front of the task, and then to try to perform a task (repeat)
    • ThreadPoolExecutor. CallerRunsPolicy: handle the tasks by the calling thread

Worker

// The smallest unit of task execution in the thread pool
// Worker inherits AQS and has the lock function
// Worker implements Runnable, which is itself an executable task
private final class Workerextends AbstractQueuedSynchronizer implements Runnable
{
    private static final long serialVersionUID = 6138294804551838833L;

    // The thread in which the task runs
    final Thread thread;
    // Tasks to be performed
    Runnable firstTask;
    volatile long completedTasks;

    // Very clever design,Worker itself is a Runnable, passing itself as a task to thread
	// There is an internal property set to Runnable
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        // The task that runs the Worker as thread
        this.thread = getThreadFactory().newThread(this);
    }
	// The Worker itself is Runnable, the run method is the entry point for Worker execution, and runWorker is the external method
    public void run(a) {
        runWorker(this);
    }

    // Lock methods
	// 0 means not locked, 1 means locked
    protected boolean isHeldExclusively(a) {
        returngetState() ! =0;
    }

    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0.1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock(a)        { acquire(1); }
    public boolean tryLock(a)  { return tryAcquire(1); }
    public void unlock(a)      { release(1); }
    public boolean isLocked(a) { return isHeldExclusively(); }

    void interruptIfStarted(a) {
        Thread t;
        if (getState() >= 0&& (t = thread) ! =null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}
Copy the code
  • Worker is like the agent of tasks. In the thread pool, the smallest execution unit is Worker, so Worker implements the Runnable interface and the run method

  • When Worker initializes, this.thread = getThreadFactory().newThread (this) is the key line, which takes the current Worker as a constructor. We will find this code in subsequent implementations: Thread t = w.thread; T.start (), where w is the Worker’s reference declaration, where t.start actually executes the Worker’s run method

  • Worker itself also realizes AQS, so it is also a lock. When it executes a task, it will lock itself and release itself after completing the task

Task submission for the thread pool

The thread pool task submission starts with the Submit method, which is defined by the AbstractExecutorService class and does two things:

  • Convert both Runnable and Callable to FutureTask
  • FutureTask is executed using the execute method
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    // The number of working threads is smaller than the number of core threads
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        // The thread pool state may change
        c = ctl.get();
    }
    // The number of working threads is greater than or equal to the number of core threads, or the new thread fails
	// If the thread pool is healthy and can be queued, try to queue
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // If the thread pool state is abnormal and attempts to remove the task from the queue, the task is rejected if it can be removed
        if(! isRunning(recheck) && remove(command)) reject(command);// If the number of threads available to run is 0, initialize a thread
        else if (workerCountOf(recheck) == 0)
            addWorker(null.false);
    }
    else if(! addWorker(command,false))
        reject(command);
}
Copy the code

The addWorker method creates a new Worker

// See if a new worker can be added based on the thread pool
// if firstTask is not empty, it can be executed directly. If thread.run () is empty, it cannot be executed
// If core is true, the maximum number of threads to be added is coresize; false, the maximum number of threads to be added is maxsize
// Return true for success, false for failure
// break retry Jumps to the retry position without entering the loop
// continue Retry Jumps to the retry position and loops again
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null&&! workQueue.isEmpty()))return false;

        for (;;) {
            int wc = workerCountOf(c);
            // The number of working threads is greater than or equal to the capacity, or greater than or equal to coreSize or maxS
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                // break Ends the for loop for retry
                break retry;
            c = ctl.get();
            // The thread pool state has been changed
            if(runStateOf(c) ! = rs)continueretry; }}boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // Clever design, Worker itself is a Runnable.
		// During initialization, worker is thrown to thread to initialize
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if(t ! =null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true; }}finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // Start the thread and actually execute the worker.run () method
                t.start();
                workerStarted = true; }}}finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
Copy the code

The addWorker method first performs a bunch of checks, then uses new Worker (firstTask) to create a new Worker, and finally uses t.start () to execute the Worker. This.thread = getThreadFactory ().newthread (this), Worker (this) is used as an argument to the constructor of a newThread, so t.start () executes the Worker’s run method

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // Task is empty:
		// 1: the task is queued. In the limit case, no thread is found running, so a new thread is created.
		// 2: The thread completes the task execution and returns to the while loop.
		// If the task is empty, the getTask method is used to block getting the data from the queue, and if the data is not available, it is blocked
        while(task ! =null|| (task = getTask()) ! =null) {
            w.lock();
            if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();try {
                Execute the before hook function
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // Execute the after hook function, which overwrites the catch if it throws an exception
					// So it's best not to throw exceptions hereafterExecute(task, thrown); }}finally {
                // The task is completed
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally{ processWorkerExit(w, completedAbruptly); }}Copy the code

What a thread does after it completes a task

This while loop has a getTask method. The main function of getTask is to block getting a task out of the queue. If there is a task in the queue, it can be taken out and executed. GetTask: getTask: getTask: getTask: getTask

// Get the task from the blocking queue
private Runnable getTask(a) {
    boolean timedOut = false;

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Thread pool closed &&queue empty, do not need to run again, directly put back
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        / / true running thread count is greater than the coreSize | | core thread can also be destroyed
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        If timedOut is true, the poll method returns the following poll
        After waiting for keepAliveTime, there is no data in the queue
        // Indicates that the thread is free of keepAliveTime
        / / plus the wc > 1 | | workQueue. IsEmpty ()
        / / so compareAndDecrementWorkerCount method is used to make the thread pool quantity reduced by 1
        // And return directly, after return, the idle thread is automatically reclaimed
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
            workQueue.take();
            if(r ! =null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false; }}}Copy the code
  • The poll or take method of the queue is used to fetch data from the queue. Depending on the characteristics of the queue, there are tasks in the queue that can be returned and no tasks in the queue will block
  • The second if statement in this method says that under certain conditions, the number of free threads will be reduced by one, and the JVM will reclaim the thread if it returns

The interview questions

If I want to do some resource cleaning before and after a thread pool task is executed, can I and how

Instead, ThreadPoolExecutor provides hook functions, and we simply need to inherit ThreadPoolExecutor and implement these hook functions. Implement the beforeExecute method before thread pool task execution and afterExecute method after execution

Tell me what you understand about thread pools

  1. ThreadPoolExecutor class structure;
  2. ThreadPoolExecutor coreSize, maxSize and other important attributes;
  3. The important role of Worker;
  4. Submit the entire process