Here are some of the must-ask questions in a job interview: Thread pool = * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *

attribute

  • ctl

    CTL is a value of type AtomicInteger that represents the control state of the thread pool. The value of CTL stores two different state values: the number of worker threads and the state of the thread. The first three bits of the CTL are used to indicate the thread status, and the last 29 bits are used to indicate the number of worker threads

    ! [image-20200409151836030](/Users/zhangminglei/Library/Application Support/typora-user-images/image-20200409151836030.png)

  • Thread running state

    RUNNING = -1 << COUNT_BITS (11100000… . 00000000).

    SHUTDOWN = 0 << COUNT_BITS (00000000… . 00000000).

    STOP = 1 << COUNT_BITS (00100000… . 00000000).

    TIDYING = 2 << COUNT_BITS (01000000… . 00000000).

    TERMINATED = 3 << COUNT_BITS (01100000… . 00000000).

  • Other attribute values

    COUNT_BITS = integer.size – 3 (29)

    CAPACITY = (1 << COUNT_BITS) -1 (2^29-1) in binary format: 0001111 11111111 11111111 11111111

The number of worker threads, thread status and CTL value are calculated by the following three methods

private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
Copy the code

The constructor

ThreadPoolExecutor’s parameters are often asked in interviews, so let’s look at the constructor with the most parameters.

CorePoolSize: Number of core pools in the thread pool that are not recycled even if the thread is free (allowCoreThreadTimeOut is false by default, if set to true, it will be recycled if the core pool is free)

MaximumPoolSize: the maximum number of threads allowed in the thread pool

KeepAliveTime: The maximum time that a thread larger than the core pool can remain in the thread pool when it is idle

Unit: keepAliveTime Time unit

WorkQueue: When tasks larger than the core pool are submitted, they are added to the queue

ThreadFactory: Factory for creating threads

Handler: The thread pool’s rejection policy when the pool is full

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
Copy the code

Rejection policies

The JDK comes with four rejection policies.

  • AbortPolicy

    An exception is thrown directly RejectedExecutionException

  • CallerRunsPolicy

    The caller performs the task

  • DiscardPolicy

    Just drop the task and do nothing

  • DiscardOldestPolicy

    Discard the oldest task and execute the current task

The execute method

Let’s take a look at the three steps executed by the execute method, and then take a look at its source code to analyze the thread pool implementation in detail.

  1. If the number of threads running in the thread pool is less than corePoolSize, try to start a new thread to execute the task
  2. If the task is successfully enqueued, a second check is still needed to see if a thread needs to be created (because an existing thread may have stopped since the last check) or if the thread pool has been shutdown. So check the status again, and if the thread pool stops you need to de-queue the task that was just enqueued, or start a new thread
  3. If the task fails to join the queue, try adding a new thread, and if that fails, execute the reject policy.
public void execute(Runnable command) {
    // Task command cannot be empty
    if (command == null)
        throw new NullPointerException();
    // Get the CTL value
    int c = ctl.get();
    // Step 1: Create threads to execute tasks that are smaller than the core pool number
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // Step 2: Add tasks to the queue if the number of core pools is greater
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null.false);
    }
    The number of threads must be smaller than the maximum number of threads. Otherwise, the policy will be rejected
    else if(! addWorker(command,false))
        reject(command);
}
Copy the code

From the execute method source code can also see the execution of the three steps, let’s look at the addWorker method, this method is mainly to create a thread, execute the task.

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get(), rs = runStateOf(c);
        // If the thread pool state is greater than SHUTDOWN, the task will not be executed.
        If SHUTDOWN is set, no new tasks are received. If there are still tasks in the queue, execution continues
        if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null&&! workQueue.isEmpty()))return false;
        for (;;) {
            int wc = workerCountOf(c);
            // Determine whether the required number of thread pools is exceeded
            if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // Number of thread pool jobs +1
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            // If the number of threads in the pool fails to update, the spin continues to update
            if(runStateOf(c) ! = rs)continueretry; }}boolean workerStarted = false, workerAdded = false;
    Worker w = null;
    try {
        // Create a new worker. The worker class implements the Runnable interface, so there are tasks in the worker that need to be executed
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if(t ! =null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());
                // check the RUNNING status of the thread pool again and make sure that the thread pool is RUNNING or SHUTDOWN and the new task is null
                if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // t is the thread of the newly created worker. If the thread is alive and not started yet, an exception will be thrown
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true; }}finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // Start the newly created thread and call the worker's run method.
                // The worker's run method is dissected below
                t.start();
                workerStarted = true; }}}finally {
        // If the rollback fails, perform rollback
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
Copy the code

So addWorker adds a new thread and starts the thread, and we all know that when a thread starts, it goes back and calls Runnable’s run method, and the same thing happens here, it calls Worker’s run method. So we can look directly at the logic of the runWorker method. Omit some code, just look at the main logic)

final void runWorker(Worker w) {... w.unlock();try {
        If w.task is not null, execute the task directly
        // If w.task is null, the getTask method is used to fetch the task from the queue
        while(task ! =null|| (task = getTask()) ! =null) { w.lock(); .try {
                beforeExecute(wt, task);
                try {
                    // Execute the task's run method
                    You can customize execution logic before and after a task, beforeExecute and afterExecute
                    task.run();
                } catch (...) {
                    ...
                } finally{ afterExecute(task, thrown); }}finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // There is a flow chart of this method in the final summaryprocessWorkerExit(w, completedAbruptly); }}Copy the code

Again, take a look at the getTask logic for getting the task from the queue (some code omitted, just the main logic)

private Runnable getTask(a) {
    boolean timedOut = false; 
    for(;;) {...int wc = workerCountOf(c);
        True if core threads are allowed to timeout or the number of workers is greater than the number of core threads
        booleantimed = allowCoreThreadTimeOut || wc > corePoolSize; .try {
            // If a thread in the thread pool needs to poll a task from the queue, null is returned after a timeout
            // Otherwise, take the task from the queue
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if(r ! =null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false; }}}Copy the code

conclusion

ThreadPoolExecutor: execute () processWorkerExit ()