Java thread pool functions and design ideas

role

  • Improve thread manageability: The main function of the Java thread pool is to manage thread resources well and prevent endless application of thread resources in the system. Because in our hotspot VIRTUAL machine, the Java thread model is 1:1, each Java thread created is corresponding to a lightweight thread in the OS. Generally speaking, Creating tens of thousands of threads puts a lot of pressure on system memory
  • Reduced resource consumption: Reuse of created threads through pooling techniques to reduce wastage from thread creation and destruction.
  • Improved response time: Tasks can be executed immediately when they arrive without waiting for threads to be created.
  • More and more power: Thread pools are extensible, allowing developers to add more functionality to them. Such as delay timer thread pool ScheduledThreadPoolExecutor, allows a stay of execution or regular task execution
Business background

Thread pooling is used a lot in our projects, such as splitting large requests, downloading resources, and asynchronous processing

Design idea

  • Thread pool is mainly used in the idea of pooling, using this pooling technology, can achieve the effect of resource reuse.
  • In the thread pool, a pattern similar to production consumers is used to process tasks.

The general operation flow chart is as follows:

Java thread pool parameters and life cycle

parameter

Either way, a thread pool is created by relying on the constructor of ThreadPoolExecutor:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
Copy the code

As can be seen from the above constructor, a thread pool construction requires the above seven parameters, respectively:

  1. CorePoolSize: core thread pool size;
  2. MaximumPoolSize: specifies the maximum thread pool size.
  3. KeepAliveTime: indicates the lifetime of idle threads.
  4. Unit: keepAliveTime;
  5. WorkQueue: a blocking queue used to hold blocked tasks;
  6. ThreadFactory: a threadFactory used to generate worker threads in a thread pool;
  7. Handler: Reject policy. When the thread pool stops or the maximum thread and queue are full, the reject method in the reject policy is called to process the task.

The keepAliveTime parameter alone determines the keepAliveTime for threads that exceed the core thread pool size. If we want to set the keepAliveTime for threads that exceed the core thread pool size, we need to set another parameter: AllowCoreThreadTimeOut, getTask() :

/ / according to allowCoreThreadTimeOut and whether the number of threads currently beyond the core thread pool size will decide whether the timeout waiting for Boolean timed = allowCoreThreadTimeOut | | wc > corePoolSize; . If the Condition is not allowed to wait, the thread will block in the Condition queue. If the Condition is not allowed to wait, the thread will wait in the Condition queue. If the Condition is not allowed, the thread will wait in the Condition queue. Runnable r = timed? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r ! = null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; }Copy the code

Even if allowCoreThreadTimeOut is set to false, in high concurrency cases, if the blocking queue is empty, the thread that did not get the task will be destroyed and the worker thread will be added again. Destroy thread source code until the number of worker threads equals the number of core threads:

private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; Mainlock. lock(); mainlock. lock(); mainlock. lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); If (runStateLessThan(c, STOP)) {if (! completedAbruptly) {// get the minimum number of threads int min = allowCoreThreadTimeOut? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; If (workerCountOf(c) >= min) return; // replacement not needed} // addWorker(null, false); }}Copy the code

The second is the workQueue: Inside the JDK blocking queue has a lot of kinds, but either a blocking queue, all need to the size of the blocking queue set a reasonable size, set too large may lead to the largest number of threads is not effective, setting is too small, may in the case of high concurrency lead to maximum thread before opening the case, the task in a queue, found that the length is not enough, The reject policy is invoked directly. Sample code:

ThreadPoolExecutor executor = new ThreadPoolExecutor( 5, 5, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3) ); for (int i = 0; i < 5; ++i) { executor.submit(new Runnable() { @Override public void run() { try { Thread.sleep(900); } catch (InterruptedException ignored) { } } }); } Thread.sleep(1000); System.out.println("After sleep"); for (int i = 0; i < 5; ++i) { try { executor.submit(new Runnable() { @Override public void run() { try { Thread.sleep(900); System.out.println(" execute done "); } catch (InterruptedException ignored) { } } }); } catch (Exception e) { System.out.println(e); }}Copy the code

In addition to these seven familiar parameters, there are several important ones:

  1. ctl
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
Copy the code

The CTL argument contains two parts. The first part contains three bits, which contains the running status of the thread pool. The second part contains 29 bits, which contains the number of worker threads running in the thread pool

private final ReentrantLock mainLock = new ReentrantLock();
Copy the code

This is a reentrant lock inside the thread pool that controls various synchronizations in the thread pool

The life cycle

The life cycle of a ThreadPoolExecutor has five states:The state flow is shown below:

ThreadPoolExecutor source code analysis

We look at the source code for ThreadPoolExecutor from two directions

  • Perform a task
  • The worker thread

Perform a task

In ThreadPoolExecutor, once the thread pool is created, tasks are executed using the execute method

public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); If (workerCountOf(c) < corePoolSize) {if (addWorker(command, true)) return; c = ctl.get(); } // If the number of threads running is greater than or equal to the number of core threads // the current thread pool is not functioning properly, Into the task and try to block the queue if (set (c) && workQueue. Offer (command)) {/ / if the core is greater than the number of threads and tasks into blocking queue are successful / / will go to pick up the thread pool status value int recheck = ctl.get(); // If the thread pool is not found to be running, the queue is blocked to remove the execution of the task, and the rejection policy if (! isRunning(recheck) && remove(command)) reject(command); // When we get here, the worker thread is 0. There are only two cases, the first case is when we set corePoolSize to 0, and the second case is when we get here, Else if (workerCountOf(recheck) == 0) addWorker(null, false); } // If the queue does not fit, we will add more threads than the core thread. addWorker(command, false)) reject(command); }Copy the code

The appeal workflow can be simplified as follows:

This also involves the process of adding a worker thread, the addWorker method

Private Boolean addWorker(Runnable firstTask, Boolean core) {retry: for (;); {// get the state of the current thread pool int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); / / query the current number of threads to see if there is room for increase the if (wc > = CAPACITY | | wc > = (core? corePoolSize : maximumPoolSize)) return false; / / CAS to increase the number of threads the if (compareAndIncrementWorkerCount (c)) break retry. c = ctl.get(); // Re-read ctl if (runStateOf(c) ! = rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; W = new Worker(firstTask); final Thread t = w.thread; if (t ! = null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new  IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } // Start the work thread if (workerAdded) {t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }Copy the code

The worker thread

One of the things that I’ve been talking about is a worker thread, and actually a worker thread is not just a single thread, but a worker thread consists of several things, the first is the thread that we’re actually running, the second is the first task that we put in when we create the thread, The third is internal AbstractQueuedSynchronizer based implementation of a set of exclusive lock, the exclusive lock is non-reentrant. When a worker thread is running, it calls the run method in the worker thread, which actually calls the runWorker method in ThreadPoolExecute.

final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; // this ensures that interrupts can be responded to before the task is executed. // allow interrupts boolean completedAbruptly = true; Try {// First check if the current worker thread is carrying a task. If it is not carrying a task, the getTask method blocks the task in the middle of the queue. // This loop is a representation of the reuse of resources in the thread pool. = null || (task = getTask()) ! = null) {// This place will lock the current worker thread, keep the thread safe w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt(); Try {// Operations before executing a task beforeExecute(wt, task); Throwable thrown = null; Try {// Execute task task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally {afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); }} // this indicates that the thread terminated normally, not because of an exception in the execution of the task completedAbruptly = false; } finally {// Destroy worker thread processWorkerExit(w, completedAbruptly); }}Copy the code

The simplified process is as follows:

Reference: Java thread pool implementation principle and its practice in Meituan business