Thread pool (Executor)

What is a thread pool?

Java5 introduces a new concurrency API called the Executor framework to make programmers’ lives easier. It simplifies the design and development of multithreaded applications. It mainly consists of Executor, the ExecutorService interface, and the ThreadPoolExecutor class, which implements both the Executor and ExecutorService interfaces. The ThreadPoolExecutor class provides an implementation of thread pools. We will learn more about this later in the tutorial.

Why do we need thread pools?

When we create a simple multithreaded application, we create Runnable objects and use Runnable to construct thread objects that we need to create, execute, and manage. We may have a hard time doing that. The Executor framework does this for you. It is responsible for creating, executing, and managing threads, and not only that, it improves application performance.

When you create a new thread for each task, then if the system is highly overloaded, you will get out of memory errors, the system will fail, and even throw oom exceptions. If ThreadPoolExecutor is used, no threads are created for new tasks. Assign tasks to a limited number of threads to execute only Runnable tasks. Once a thread completes a task, it blocks the queue for a Runnable to execute.

How do I create a thread pool?


public interface Executor {
 void execute(Runnable command);
}
 
Copy the code

There is another interface called ExecutorService, which extends the Executor interface. It can be called an Executor, and it provides methods that control termination and generate methods that track the progress of one or more asynchronous tasks in the future. It has methods like submit, shut down, shut down now, and so on.

ThreadPoolExecutor is the actual implementation of ThreadPool. It extends the implement AbstractThreadPoolExecutor ExecutorService interface. ThreadPoolExecutor can be created from the factory method of the Executor class. It is recommended that you use a method to obtain an instance of ThreadPoolExecutor.

  • useExecutorsFactory method to create a thread pool:

Provide default static methods

There are four factory methods available for obtaining instances of ThreadPoolExecutor in the Executors class. We’re using Executors’ newFixedThreadPool to get an instance of ThreadPoolExecutor.

ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
Copy the code
methods instructions
newFixedThreadPool(int nThreads) This method returns a thread pool executor whose maximum size (say, n threads) is fixed
newCachedThreadPool() This method returns an infinite thread pool.
newSingleThreadedExecutor() This method returns a thread executor that is guaranteed to use a single thread.
newScheduledThreadPool(int corePoolSize) This method returns a fixed-size thread pool that can schedule commands to run after a given delay or to be executed periodically
  • The customThreadPoolExecutorCreates a thread pool

Provide a default constructor

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,
TimeUnit unit,BlockingQueue workQueue ,ThreadFactory threadFactory,RejectedExecutionHandler handler) ;
Copy the code
parameter instructions
corePoolSize Core threads
maximumPoolSize Maximum number of threads
keepAliveTime Maximum time for a thread to remain alive
unit Unit of time
workQueue Blocking queue
threadFactory Thread factory
handler Rejection policies

ThreadPoolExecutor source code analysis

  • Internal thread pool state

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; Private static int runStateOf(int c) {return c & ~CAPACITY; } private static int workerCountOf(int c) {return c & CAPACITY; } / / setting state & the number of threads for CTL value private static int ctlOf (int the rs, int wc) {return rs | wc. }Copy the code

The CTL variable uses 29 bits lower to indicate the number of threads in the thread pool and 3 bits higher to indicate the running state of the thread pool:

  • RUNNINGCOUNT_BITS: -1 << COUNT_BITS, i.e. 111, the thread pool in this state receives new tasks and processes tasks in the blocking queue;
  • SHUTDOWNCOUNT_BITS: 0 << COUNT_BITS, that is, the high three bits are 000. The thread pool in this state will not receive new tasks, but will process tasks in the blocking queue.
  • STOPCOUNT_BITS: 1 << COUNT_BITS, i.e. 001, a thread in this state does not receive new tasks, does not process tasks in a blocking queue, and interrupts running tasks;
  • TIDYINGCOUNT_BITS: 2 << COUNT_BITS, i.e. 010, all tasks have been terminated;
  • TERMINATED: 3 << COUNT_BITS. The high three bits are 011. Terminated

Let’s take a look at some of the core methods inside ThreadPoolExecutor:

  • Add a task: execute(Runnable Command)

Execute the Runnable entry method

public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); //workerCountOf gets the current number of threads in the thread pool; If (workerCountOf(c) < corePoolSize) {if (addWorker(command, true)) return; c = ctl.get(); } // double check: If (isRunning(c) && workqueue.offer (command)) {int recheck = ctl.get(); If the thread pool is not RUNNING, remove(command) the task from the queue. Execute the reject method to process the task if (! isRunning(recheck) && remove(command)) reject(command); // If the thread pool is in the running state, but there are no threads, threads are created to execute the queued tasks. else if (workerCountOf(recheck) == 0) addWorker(null, false); } // Fail to add a new thread to the pool, reject task else if (! addWorker(command, false)) reject(command); }Copy the code

  • AddWorker (Runnable firstTask, Boolean core)

Let’s see how to add worker threads

private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); Int rs = runStateOf(c); If the thread pool is in the RUNNING state, false will not be returned. The current thread pool state is not RUNNING), if any of the following three conditions are not true, the new worker thread will be returned directly: * 1. The current thread pool state is SHUTDOWN * 2. The submitted task is NULL * 3. If (rs >= SHUTDOWN &&! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) Int wc = workerCountOf(c); /* * If the number of threads in the current pool is greater than = 2 ^ 29 minus 1, no new threads can be created. 29 power pool maximum number of threads to 2 minus 1 * if the core is true on core first according to quantity, otherwise, comparing with the largest number of threads * / if (wc > = CAPACITY | | wc > = (core? corePoolSize : maximumPoolSize)) return false; / / add the value of workerCount 1 and jump out of the outer loop if (compareAndIncrementWorkerCount (c)) break retry. C = ctl.get(); c = ctl.get(); if (runStateOf(c) ! = rs) continue retry; } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; Try {/* * create a Worker instance here and set firstTask to it * Notice that the Worker class has two special fields: The Thread *Worker class inherits the Runnable interface and implements its run() method */ w = new Worker(firstTask); // t is the thread object represented by w itself, not firstTask. final Thread t = w.thread; if (t ! = null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); Rs int rs = runStateOf(ctl.get()); /* *rs < SHUTDOWN state only RUNNING state * The current RUNNING state of the thread pool to RUNNING * 2. The current state of the thread pool for SHUTDOWN and firstTask null * / if (rs < SHUTDOWN | | (rs = = SHUTDOWN && firstTask = = null)) { if (t.isAlive()) throw new IllegalThreadStateException(); Worker.add (w); worker.add (w); worker.add (w); int s = workers.size(); If (s > largestPoolSize) largestPoolSize = s; WorkerAdded = true; workerAdded = true; } }finally { mainLock.unlock(); } if (workerAdded) {// Start the Worker thread. T.start (); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }Copy the code
  • Run a task: runWorker(Worker w)

When addWorker succeeds, the Worker’s start() method is called. Next, let’s analyze how the task is executed.

Final void runWorker(Worker w) {Thread wt = thread.currentThread (); // Get the first task Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // Allow interrupt Boolean completedAbruptly = true; Try {// task is not empty or getTask() gets the task is not empty and enters the loop while (task! = null || (task = getTask()) ! = null) { w.lock(); Call wt.interrupt() to interrupt the worker thread if ((runStateAtLeast(ctl.get()), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt(); Try {// In ThreadPoolExecutor this method is an empty method beforeExecute(wt, task); Throwable thrown = null; Try {// Execute the task. task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; W.com pletedTasks++; // release lock w.nlock (); }} // if no exception occurs during the execution of the task, completedAbruptly is assigned false completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); }}Copy the code

We haven’t yet seen how worker threads are reclaimed when the number of threads >coreSize, so don’t worry, let’s look at the getTask() method.

  • GetTask task: getTask()

private Runnable getTask() { boolean timedOut = false; for (;;) { int c = ctl.get(); int rs = runStateOf(c); /* * If the thread pool is RUNNING, the following if will not be entered. *1. If the state is STOP, TIDYING, or TERMINATED, the current working thread cannot execute the task. *2. If the state is SHUTDOWN and the blocking queue is empty, Is access to null * / if (rs > = SHUTDOWN && (rs > = STOP | | workQueue. IsEmpty ())) {/ / workerCount value minus 1 decrementWorkerCount (); return null; Int wc = workerCountOf(c); / / allowCoreThreadTimeOut if set to true or the current pool of worker threads number is greater than the core number of threads is timed to true Boolean timed = allowCoreThreadTimeOut | | wc > corePoolSize; Null is returned from this method if the current number of worker threads exceeds the maximum number of threads. Working to end the thread if ((wc > maximumPoolSize | | (timed && timedOut)) && (wc > 1 | | workQueue. IsEmpty ())) {if (compareAndDecrementWorkerCount(c)) return null; continue; } try {/* * If allowCoreThreadTimeOut is set to true or the number of workers in the current pool is greater than the number of core threads * : If allowCoreThreadTimeOut is set to false and the number of worker threads in the current pool is smaller than the number of core threads * : */ Runnable r = timed? Obtain a task from a blocking queue within a specified time. workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r ! = null) return r; // If r == null, timedOut = true; } catch (InterruptedException retry) {// If the thread is interrupted during the blocking wait, set timeOut to false to enter the next loop. }}Copy the code
  • To close the thread: shutdown()

public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); CheckShutdownAccess (); // Check whether you have permission to close the thread pool. // Set the thread pool state to advanceRunState(SHUTDOWN); // interruptIdleWorkers(); // This method is an empty method in ThreadPoolExecutor onShutdown(); } finally { mainLock.unlock(); } // Try to set the thread pool state to TERMINATED. tryTerminate();Copy the code
  • Close the thread now: shutdownNow()

This method interrupts the task execution and returns an unexecuted task

public List<Runnable> shutdownNow() { List<Runnable> tasks; // Final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); AdvanceRunState (STOP); interruptWorkers(); Tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }Copy the code

Considerations for thread pool usage

  • Using ThreadLocal

ThreadLocal is called thread-local storage and is typically used as a static field, providing a copy of its value for each thread that uses it. Thread-local storage is commonly used for database connections and transactions. ThreadLocal can be thought of simply as a container that stores a value object in a Map<Thread, T> field, that is, a Map using the current Thread as the key, ThreadLocal’s get() method retrieves the value object from the Map associated with the current thread. The actual implementation of ThreadLocal isn’t like this, but it can be understood simply like this. Threads in the thread pool are reused after the task completes, so when the thread completes, the ThreadLocal is cleaned up (removing the value object associated with the thread). Otherwise, a thread that is being reused to perform a new task will use the value object that was manipulated by the previous thread, resulting in undesirable results.

  • Set a reasonable number of threads

Novice may have an error to use a thread pool, the higher the concurrent use more threads, however the actual situation is too much thread will cause a lot of system Context – the Switch which influence the system throughput, so reasonable number of threads need to combined with the project to make the pressure test, usually we mainly aimed at 2 types of task set rules of number of threads to:

  1. cpu-intensive

    CoreSize == Number of CPU cores +1

  2. Io intensive

    CoreSize == 2* Number of CPU cores

The end of the

🔍 wechat public number coding way, scan code to join our communication group!