Thread pool is usually used in the work of more concurrent scenarios, today with a demo to take a look at the principle of thread pool, line by line to see the source code.

public class ThreadPoolDemo {
    public static void main(String[] args) {
        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5.10.0L,
                TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(5),namedThreadFactory,new ThreadPoolExecutor.AbortPolicy());
        for (int i=1; i<=50; i++){ AtomicInteger count =new AtomicInteger(i);
            threadPoolExecutor.execute(() -> {
                System.out.println(Thread.currentThread().getName()+"Doing the number one." + count.getAndIncrement() + "A mission.");
                try{
                    Thread.sleep(3000L);  // Simulate business code execution time
                }catch(Exception e){ e.printStackTrace(); }}); }}}Copy the code

ThreadFactory can set attributes such as thread name and priority for a thread. Adding a thread name for a thread can help you quickly locate the faulty thread pool using logs. You are advised to set a different thread name for each thread pool to facilitate locating.

The demo thread pool is set to 5 core threads, 10 total threads, no wait time, and 5 blocking queues.

Execution Result:

Let’s look at the source code with some questions:

Why are loops 11-15 executed first and then loops 6-10 executed later

Why is an exception thrown after 11-15 loops

Why is it not executed after the tenth loop

Argument parsing

Int corePoolSize: number of core threads. By default, the thread is retained even if it is idle. The allowCoreThreadTimeOut parameter can be set to true to reclaim the core thread.

Int maximumPoolSize: specifies the maximum number of threads. The maximum number of threads allowed in the thread pool. Must be greater than >= number of core threads.

Long keepAliveTime: indicates the wait time. When the number of threads in the thread pool is greater than the number of core threads, space threads will be freed after waiting time.

TimeUnit Unit: indicates the unit of waiting time.

BlockingQueue workQueue: Wait queue. Queue stores tasks that have not yet been executed. The Queue structure follows the first-in, first-out principle.

ThreadFactory ThreadFactory: A factory for creating new threads, which can be used to set thread names that can be used to quickly locate problems in logs.

RejectedExecutionHandler Handler: Reject the policy. Defines how to discard a task when it is outside the scope of the thread pool.

The execute method

The thread pool performs the Ececute method to add tasks.

The CTL in the method represents the state of the thread pool and the number of existing threads. The first three bits of the CTL store the state, and the other bits store the number of threads.

// Packing and unpacking ctl
// Thread pool status
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// Number of threads
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
Copy the code
public void execute(Runnable command) {
    Command is the lambda expression following the execute method in the demo code
        if (command == null)
            throw new NullPointerException();
      // Get thread pool status and number information
        int c = ctl.get();
    // If the number of current threads is less than the number of core threads
        if (workerCountOf(c) < corePoolSize) {
            // Create a new thread for command to execute and return
            if (addWorker(command, true))
                return;
            // New thread fails to execute, update current thread state
            c = ctl.get();
        }
    // The current thread is runnable and the current task can be queued
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // double check When the thread is no longer runnable, the task needs to be deleted from the queue and the rejection policy is executed
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // If the current number of threads is 0, create an empty thread
            else if (workerCountOf(recheck) == 0)
                // When the task is empty, an attempt is made to fetch the task from the queue
                addWorker(null.false);
        }
    // If it cannot queue, try to open a new non-core thread. If it fails, the rejection is executed
        else if(! addWorker(command,false))
            reject(command);
    }
Copy the code

By analyzing the source code, it is clear that the execution logic for a thread pool processing multiple tasks looks like this:

  • Use the core thread first
  • When the number of core threads is full, the queue is queued
  • When the queue is full, use non-core threads
  • The maximum number of threads and queues are full. Reject policy is implemented

AddWorker method

The execute method above shows the execution logic of the thread pool for task assignment, and the addWorker method performs the logic of adding threads.

@param firstTask: the firstTask that the new thread should perform

@param core: If true, use core threads; if false, use non-core threads.

// runState is stored in the high-order bits
// Thread pool status
// In the normal execution state, it can accept new tasks and process tasks in the blocking queue
private static final int RUNNING    = -1 << COUNT_BITS;
// Does not accept newly submitted tasks, but can process tasks in the blocking queue
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// Does not accept new tasks, nor does it process tasks in the queue, interrupts the thread that is processing tasks
private static final int STOP       =  1 << COUNT_BITS;
// If all tasks have terminated, there are no valid threads
private static final int TIDYING    =  2 << COUNT_BITS;
// The thread pool stops
private static final int TERMINATED =  3 << COUNT_BITS;
Copy the code
private boolean addWorker(Runnable firstTask, boolean core) {
        retry: // This is goto for retry
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            // If the following conditions are met, the thread fails to be added.
            // When the thread pool state is not runnable, if a new task comes in, or the queue is not empty, you need to return failure
            // After a failure is returned, the queue is emptied and the denial policy is executed
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null&&! workQueue.isEmpty()))return false;

            for (;;) {
                // Number of current threads
                int wc = workerCountOf(c);
                // Determine whether to use core threads or non-core threads according to core, and determine whether the current number of threads is within the range
                // If out of range, return failure
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // This is a CAS operation. If it succeeds, the number of threads +1 and the outer loop exits
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                // If the current thread pool state is different from the previous one, retry; Began to try again
                if(runStateOf(c) ! = rs)continue retry;
                // If the above CAS operation fails, the inner loop will be repeated after the status judgment
                // else CAS failed due to workerCount change; retry inner loop}}// This means that the CAS has succeeded and the thread has been added successfully
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // Create a thread for the input task using ThreadFactory
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if(t ! =null) {
                / / lock
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
                    // Check the status again if the status is runnable or if the status is shutdown but the input task is empty
                    // Threads can be added
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        // The thread is already alive before it starts
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        // Record the maximum number of threads
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        // The flag bit that the thread has added
                        workerAdded = true; }}finally {
                    / / unlock
                    mainLock.unlock();
                }
                // Thread added, start thread
                if (workerAdded) {
                    t.start();
                    workerStarted = true; }}}finally {
            // If the thread fails to start properly, the thread needs to be removed, number of threads -1, and try to close the thread pool
            if (! workerStarted)
                addWorkerFailed(w);
        }
        // Returns a flag bit indicating whether the thread started successfully
        return workerStarted;
    }
Copy the code

RunWorker method

When there are idle threads, the current method is executed, where the business logic is executed through task.run(). After executing the current task in the while method, the next loop will continue until there are no more tasks in the queue. The thread is the same thread, but performs different tasks, so it is the loop in this method that enables thread reuse.

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // When the task is empty, the queue is blocked to fetch the task
            while(task ! =null|| (task = getTask()) ! =null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted. This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                // Thread pool status >=stop, execute interrupt
                if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();try {
                    // Empty code in beforeExecute and afterExecute
                    // both are extension points that can be customized
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // Execute the task
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally{ afterExecute(task, thrown); }}finally {
                    // Next time the task is empty, the task in the queue will be blocked
                    task = null;
                    // Number of completed items +1
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            // Remove the current thread from the workers collection
            // If the execution is normal, a new null thread will be opened to try to process the task again
            // If there are no tasks to execute, the thread pool is closedprocessWorkerExit(w, completedAbruptly); }}Copy the code

GetTask method

private Runnable getTask(a) {
    // Retrieve flag bit due to timeout
    boolean timedOut = false; // Did the last poll() time out?
    
    for (;;) {
        // Current thread pool status
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
		// If status >=stop or status >=shutdown and the queue is empty
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            // Indicates that the thread pool cannot be executed or does not need to be executed. Therefore, the CAS operation is executed by thread -1
            decrementWorkerCount();
            return null;
        }

        // Number of current threads
        int wc = workerCountOf(c);

        // Are workers subject to culling?
        // Retrieve flag bit due to timeout
        // allowCoreThreadTimeOut Is false by default. When false, the core thread is not collected even if it is idle
        // If this is true, the core thread will wait for keepAliveTime and then be recycled if it is still idle
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        // Cas attempts to execute thread-1 when there are not enough threads and the release condition is met
        // And will continue to re-enter the loop to try again
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        // When this step is entered, there is an idle thread to execute
        try {
            Poll is used when idle threads are being reclaimed, and take is used in other cases
            // Both functions return the head of the queue and delete it
            // The poll method will keep trying to fetch the queue head for the duration of the wait, and will return NULL if timeout occurs
            // The take method does not collect, so it can get awwait without the queue header
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if(r ! =null)
                return r;
            If the queue is empty, the next loop can try thread-1
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false; }}}Copy the code

conclusion

At this point, the question posed in the beginning is pretty much clear.

Why are loops 11-15 executed first and then loops 6-10 executed later

Number of core threads After completion of execution, 6-10 tasks are queued, and 11-15 tasks activate non-core threads first

Why is an exception thrown after 11-15 loops

The thread pool is insufficient to execute the reject policy. Because the number of core threads is exhausted and the queue is full, the rejection policy is executed. The last branch of the execute() method to execute.

Why is it not executed after the tenth loop

Since there is no catch to handle after an exception is thrown, the pool ends by default, but tasks 6-10 are executed because there are unfinished tasks in the blocking queue.