ThreadPoolExecutor is one of the most commonly used classes in JUC. ThreadPoolExecutor, as the name suggests, is a threadpool-executor. In Java, it is easy to create a thread pool using ThreadPoolExecutor. But why do we use thread pools? ? What advantages can it bring? How does it work? OK, with those questions in mind, let’s take a look at thread pooling in JAVA.

Why use thread pools?

This is a bit of a chicken question, but I think it’s better to ask why there are thread pools before asking again. So why is that?


This is a example:

Express industry in the past two years the development of ash often hot, I heard that the salary is also very high, make I have no idea to write code day by day...

Before the small express companies are not fixed Courier, that is to say, every time to send a Courier, the head of the site will need to find a person to help deliver, after the delivery there is no then (of course, money or to give).

But later, as more and more goods, it was too expensive to find someone to pay, and it took a long time to find someone when the farm was busy, so we hired five people and signed a contract to deliver goods for the site for a long time.

In the past, it was always available at any time, but now it is not. Now it is to establish a logistics company and open a distribution department, and the distribution department stipulates that there are only five formal distribution personnel at most.

What are the disadvantages of previous delivery:

  • Every time there is a shipment, I will find a temporary person, and then sign a temporary contract, after the delivery of the contract. Very troublesome. This is also the disadvantage of not using the thread pool. As tasks come, we need to frequently create new threads and release thread resources after using them, which is a great consumption for the system.
  • Because there are only a few trucks for delivery, if more people sign temporary contracts, the car is not enough, others can only wait for the car after delivery.

Problems solved after the establishment of distribution department

  • After the establishment of the distribution department, because of the signed labor contract, we can repeatedly ask the delivery personnel to deliver different goods. Reuse of thread resources is achieved.
  • By limiting the number of people you can hire, you can avoid hiring too many useless people.

OK, let’s use the above example to understand the basic principles of thread pools

JAVA class declaration for ThreadPoolExecutor:

public class ThreadPoolExecutor extends AbstractExecutorService 
Copy the code

The Executor inheritance system is presented in the -Juc Executor framework. ThreadPoolExecutor is an integrator with thread pool functionality.

A constructor

// select * from ()
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
                          
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
         
 }
 // set the following parameters
 public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         threadFactory, defaultHandler);
}
// select * from ()
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), handler);
}
// set the following parameters
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
Copy the code

As you can see from the code above, constructors (one, two, three) all initialize specific properties by calling (four). So let’s go straight to constructor number four; In constructor 4, a total of 7 parameters are required. Let’s take a look at the specific meaning of each parameter:

  • corePoolSize

    Number of core threads. So what is the number of core threads? We can compare it to the number of people signing labor contracts in the distribution department in the example above.

  • maximumPoolSize

    Maximum number of threads. I said that there were so many express deliveries during the Double Eleven, the five people in the distribution department were completely overwhelmed, and the warehouse was full. What should I do? If the maximumPoolSize is 10, then the maximum number of temporary deliverers can be 5, and the total number of formal and temporary deliverers in the distribution department cannot exceed the maximum number specified by the distribution department (10). So maximumPoolSize is the maximum number of threads allowed to exist in a thread pool.

  • keepAliveTime

    Survival time. Why do we have this? Think about it, double Eleven has passed, and the goods have been delivered almost. The temporary contract states that the distribution department has the right to terminate the temporary contract if the temporary distribution personnel have not delivered for 2 days. Now the point of 2 days has been reached, and these temporary distribution personnel need to be dismissed. For thread pools, keepAliveTime is used to indicate that threads outside the core thread pool need to be reclaimed when they exceed keepAliveTime.

  • unit

    KeepAliveTime Unit of keepAliveTime.

  • workQueue

    Work queues. This is equivalent to a warehouse. Now all 5 people in the distribution department are delivering, but there are still new deliveries. At this time, a warehouse is needed to store these deliveries. In the case of thread pools, tasks are added to the work queue when the core threads have their own tasks to work on and more tasks come in.

  • threadFactory

    Thread factories. It’s used to create threads. This is analogous to a recruitment group, where each thread is assigned a name or number.

  • handler

    RejectedExecutionHandler Specifies the RejectedExecutionHandler used to reject a policy. Let’s say that now my warehouse is satisfied and the distribution department has reached 10 people. So what do we do? We have to use some strategy to reject the task.

Status of the thread pool

// runState is stored in the high-order bits
/ / RUNNING; The thread pool in this state receives new tasks and processes tasks in the blocking queue
private static final int RUNNING    = -1 << COUNT_BITS;
/ / SHUTDOWN; The thread pool in this state does not receive new tasks, but processes tasks in the blocking queue;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
/ / STOP; No new tasks are received, no tasks in the blocking queue are processed, and running tasks are interrupted;
private static final int STOP       =  1 << COUNT_BITS;
// All tasks have terminated, CTL records "task number" is 0, and the thread pool changes to TIDYING state
private static final int TIDYING    =  2 << COUNT_BITS;
// The thread pool is TERMINATED completely and becomes TERMINATED.
private static final int TERMINATED =  3 << COUNT_BITS;
Copy the code

Below is a picture of a big bull found on the Internet; Feeling can describe the change of state intuitively

The working principle of

There are a few things to note.

1. How to submit a task to a thread pool?

public void execute(Runnable command) {
    // If the task is null, a null pointer exception is thrown
    if (command == null)
        throw new NullPointerException();

    int c = ctl.get();
    // If the number of threads is greater than or equal to the basic number of threads, the task is queued
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null.false);
    }
    else if(! addWorker(command,false))
        reject(command);
}
Copy the code
  • If fewer than corePoolSize threads are running, try starting a new thread with the given command as its first task. Calls to addWorker automatically check runState and workerCount to prevent error alarms and add threads by returning false when they shouldn’t.
  • If a task can be queued successfully, then we still need to check again to see if a thread should be added (because the existing thread has died since the last check) or if the pool has been closed since entering the method. So we recheck the status, exit the work queue if the current command has stopped, and start a new thread if not.
  • If the queue is full, an attempt is made to create a new thread to execute it, and if it cannot, a rejection policy is executed.

2. How to create a thread to process a task?

Create a new thread by implementing this interface

public interface ThreadFactory {
    Thread newThread(Runnable r);
}
Copy the code

3. How do I add tasks to the queue?

The addWorker method is used to add tasks. In Excute, it is used as an entry point for submitting tasks. The actual processing logic is done in addWorker. AddWorker takes two arguments:

  • FirstTask Current task
  • Core is used to indicate whether or not the current thread to be created is a core thread. If core is true, it indicates that a core thread has been created, which means that the maximum number of core threads has not been reached.

Let’s look at the first half of this method:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    // Spin mode
    for (;;) {
        // Get the state of the current thread pool
        int c = ctl.get();
        int rs = runStateOf(c);
    
        // If the state is TERMINATED at STOP, TIDYING, and TERMINATED, false is returned
        // If the state is SHUTDOWN but firstTask is not empty or workQueue is empty, return false.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null&&! workQueue.isEmpty()))return false;
        // Determine whether the worker to be added belongs to the corePool category by means of spin
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if(runStateOf(c) ! = rs)continue retry;
            // else CAS failed due to workerCount change; retry inner loop}}Copy the code

// Return false if the CAPACITY limit is exceeded

wc >= CAPACITY
Copy the code

// Determine if the current workerCount is greater than corePoolsize, or if it is greater than maximumPoolSize, depending on whether core is true or false.

wc >= (core ? corePoolSize : maximumPoolSize)
Copy the code

If either of the above two satisfy, return false.

Check whether the WorkerCount was successfully incremented by 1 through the CAS operation

if (compareAndIncrementWorkerCount(c))
    break retry;
Copy the code

If not, the state of the current thread pool is determined again, and if the state retrieved is inconsistent with the state entered into spin, the state is determined again with a continue Retry.

c = ctl.get();  // Re-read ctl
if(runStateOf(c) ! = rs)continue retry;
Copy the code


boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
    final ReentrantLock mainLock = this.mainLock;
    // Create a new Worker object
    w = new Worker(firstTask);
    final Thread t = w.thread;
    //
    if(t ! =null) {
    / / lock
        mainLock.lock();
        try {
            // Recheck in locked condition.
            Exit if ThreadFactory failed to create or shut down before obtaining the lock
            int c = ctl.get();
            int rs = runStateOf(c);
           // Check the status
            if (rs < SHUTDOWN ||
                (rs == SHUTDOWN && firstTask == null)) {
                if (t.isAlive()) // Pre-check that t can be started
                    throw new IllegalThreadStateException();
                // Add to workers
                workers.add(w);
                int s = workers.size();
                // If the historical maximum number of threads is exceeded, the current pool number is set to the historical maximum number of thread records
                if (s > largestPoolSize)
                    largestPoolSize = s;
                // Indicates that the worker thread was added successfully
                workerAdded = true; }}finally {
        / / unlock
            mainLock.unlock();
        }
        // If added successfully, the current worker thread is started
        if (workerAdded) {
            t.start();
            // Set the current thread state to started
            workerStarted = true; }}}finally {
// Failed to add
    if (! workerStarted)
        addWorkerFailed(w);
}
return workerStarted;
}
Copy the code

What are the rejection strategies?

  • AbortPolicy: directly throws an exception, the default policy.
  • CallerRunsPolicy: executes the task using the caller’s own current thread;
  • DiscardOldestPolicy: Discards the top task in the blocking queue and executes the current task.
  • 4. DiscardPolicy: Directly discards tasks;

Of course, we can also customize the rejection policy.

Common work queue types

1, ArrayBlockingQueue

An array-based blocking queue of limited length

2, LinkedBlockingQuene

A linked list based blocking queue of infinite length, using which may invalidate our rejection policy. Because you can create new worker threads indefinitely.

3, PriorityBlockingQueue

An unbounded blocking queue with priority;

3, SynchronousQuene

SynchronousQuene is a BlockingQueue that does not store elements; Each PUT operation must wait for a take operation, otherwise it cannot continue to add elements. So this one is special, it doesn’t hold our task, so it says that every PUT operation must wait until another thread calls take, otherwise the PUT operation is blocked.

Worker

This is an inner class of ThreadPoolExecutor that represents a worker thread. What is important is the inner class implements AbstractQueuedSynchronizer (AQS: abstract queue synchronizer) an abstract class.

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */
    private static final long serialVersionUID = 6138294804551838833L;

    /** The thread currently held by work */
    final Thread thread;
    /** The initial task to run. It may be empty. * /
    Runnable firstTask;
    /** The number of tasks completed by each thread */
    volatile long completedTasks;

    /** * constructor */
    Worker(Runnable firstTask) {
    // Disable interrupts until runWorker
        setState(-1); 
        // The task you want to submit is given to the current worker thread
        this.firstTask = firstTask;
        // Create a new thread from the thread factory
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegate execution of the run method to the external runWorker */
    public void run(a) {
        runWorker(this);
    }

    // Whether to lock
    //
    // 0 indicates the unlocked state.
    // 1 indicates the locked state.

    protected boolean isHeldExclusively(a) {
        returngetState() ! =0;
    }
    // Try to get lock (override AQS method)
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0.1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
    // Try to release the lock (override AQS method)
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }
    / / lock
    public void lock(a)        { acquire(1); }
    // Try locking
    public boolean tryLock(a)  { return tryAcquire(1); }
    / / unlock
    public void unlock(a)      { release(1); }
    // Whether to lock
    public boolean isLocked(a) { return isHeldExclusively(); }
    // If it is started, it is interrupted
    void interruptIfStarted(a) {
        Thread t;
        if (getState() >= 0&& (t = thread) ! =null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}
Copy the code

runWorker

Finally, look at the runWorker method (ThreadPoolExecutor method) :

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while(task ! = null || (task = getTask()) ! = null) { w.lock(); // If pool is stopping, ensure thread is interrupted; //if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly =false; } finally { processWorkerExit(w, completedAbruptly); }}Copy the code

The following is a poor translation of the annotations, welcome to ridicule, but pay attention to the scale, O(∩_∩) ha ha ~

The main work cycle runs. Repeatedly fetching tasks from the queue and executing them while dealing with some problems:

  • We might start with the original mission, in which case we don’t need to get the first mission. Otherwise, we get the task from getTask as long as the pool is running. If it returns NULL, the worker exits due to changing the pool state or configuration parameters. The other exit results in an exception thrown in external code, in which case completeAbruptly holds, which usually causes processWorkerExit to replace the thread.
  • Before running any task, acquiring a lock to prevent the task being performed is interrupted the other pool, call clearInterruptsForTaskRun ensure unless pool is stopped, this thread is not set off.
  • BeforeExecute is called before each task runs, which may throw an exception, in which case we cause the thread to die (disconnect loop completeAbruptly is true) without processing the task.
  • Assuming beforeExecute completes properly, we run the task to collect any thrown exceptions to send to afterExecute. We handle runtimeExceptions, errors (both specifications guarantee us traps), and arbitrary Throwables, respectively. Since we can’t rethrow Throwable in runnable. run, we wrap them in Errors (to the thread’s UncaughtExceptionHandler). Any exceptions thrown also conservatively cause the thread to die.
  • After task.run is complete, we call afterExecute, which may also throw an exception, which also causes the thread to die. According to JLS Sec 14.20, this exception is valid even if task.run is thrown.

The net effect of the exception mechanism is that afterExecute and the thread’s UncaughtExceptionHandler have accurate information about any problems encountered by the user’s code.

conclusion

This article, the second in JUC, aims to see how thread pools work by looking at the source code. If there is improper description in the article, I hope my friends can put forward in time. Thank you!

Welcome to follow the wechat public account, full of dry goods oh ~