Excerpt from The Art of Concurrent Programming in Java

Pooling technology

The pooling technology is used to save some resources to the pool for management when an application is started. When resources need to be obtained, they can be directly obtained from the pool and returned to the pool after being used, thus reducing the consumption of resource creation and destruction. Common pooling technologies include thread pool, connection pool, memory pool, etc. Resources in thread pool are threads, resources in connection pool are connections, and resources in memory pool are memory.

Advantages of thread pools

  • Reduced resource consumption: Reduces thread creation and destruction costs by reusing created threads.
  • Improved response time: When a task arrives, it can be executed immediately without waiting for the thread to be created.
  • Improve manageability of threads: Threads are scarce resources. If created without limit, they will not only consume system resources, but also reduce system stability. Thread pools can be used for uniform allocation, tuning, and monitoring.

Thread task interfaceRunnable && Callable

@functionalInterface public Interface Runnable {/** * when a thread is created using an object that implements interface Runnable, the starting thread calls the object's run method in a separate thread of execution. */ public abstract void run(); } @functionalInterface public interface Callable<V> {/** * Throws an Exception if it cannot calculate * * @return calculation result * @throws Exception If it cannot calculate */ V call() throws Exception; }Copy the code

The Callable interface is similar to Runnable in that both instances can be executed asynchronously by threads. Runnable, however, does not return a result or raise a checked exception. It prints the exception directly, whereas Callable captures the checked exception in the returned Future. After Callable is finished, future.get () can fetch the result or any exceptions thrown during execution. Ex. :

ThreadPoolExecutor executorService = new ThreadPoolExecutor(5, 10, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
Callable callable = () -> 1 / 0;
Runnable runnable = () -> System.out.println(1 / 0);
Future future =  executorService.submit(callable);
try {
    System.out.println("future get:" + future.get());
}catch (Exception e){
    System.err.println("future get exception");
    e.printStackTrace();
}
executorService.execute(runnable);
executorService.shutdown();
System.out.println("shutdown success");
Copy the code

The console will print “Future Get Exception”, two exceptions, and “shutdown success”. Since future.get () blocks the main thread from throwing an exception, a try-catch is required to prevent the thread pool from closing properly. Execute () is not detected and can be closed without capturing the thread pool. Executors. Callable () converts Runnable to Callable.

A brief introduction to the various thread pool Executor related classes

  • ExecutorInterface: provides method execute() to separate task submission from task execution
  • ExecutorServiceInterface:ExecutorProvides abstract methods for task termination and tracing
  • AbstractExecutorServiceAbstract classes: providedExecutorServiceDefault implementation of
  • ScheduledExecutorServiceInterface:ExecutorServiceExtended interface to provide abstract methods of task delay and timed execution
  • ThreadPoolExecutorClass (core) :ExecutorServiceThe complete implementation class of
  • ThreadPoolExecutor.WorkerClass:AbstractQueuedSynchronizerExtension classes that maintain interrupt control status of the thread running the task and other secondary records
  • ScheduledThreadPoolExecutorClass: InheritedThreadPoolExecutorAnd implementedScheduledExecutorService, provides the implementation of task delay and timing execution

ThreadPoolExecutorClass source code parsing

attribute

AllowCoreThreadTimeOut is set to true to allow core threads to be released when they exceed their lifetime. MaximumPoolSize: KeepAliveTime: Specifies the lifetime of the redundant threads when the number of threads is greater than the number of cores. If there are no new tasks in the lifetime, the workQueue will be released: Task queue. When a new task arrives, it will determine whether the number of threads currently running reaches the number of core threads. If so, the new task will be stored in the queue. ThreadFactory: a threadFactory responsible for creating threads handler: a policy for handling the number of submitted tasks (the number of tasks in the thread pool and queue has reached the upper limit) CTL: Master pool control state, which is packed with two conceptual fields workerCount and runState. WorkerCount refers to the valid number of threads running, and runState refers to the running state of a thread (running, closed medium) for bitwise operations to obtain workerCount and runState

  • RunState: Provides primary thread pool lifecycle control and has the following flags:
    • RUNNING: Accept new tasks and handle queuing tasks
    • SHUTDOWN: does not accept new tasks, but processes queued tasks
    • STOP: does not accept new tasks, does not process queued tasks, and interrupts ongoing tasks
    • TIDYING: All tasks have terminated, workerCount is zero, and the thread that transitions to the TIDYING state will runterminated()methods
    • TERMINATED:terminated()End of the call

Workers: All sets of worker threads in the thread pool. MainLock is accessed only when mainLock is held. Workers is used to access workers and concurrency control

Full-parameter constructor

/** * Construct an instance of ThreadPoolExecutor from the given initial argument ** @param corePoolSize Number of core threads, * @param maximumPoolSize Maximum number of threads * @param keepAliveTime Thread lifetime * @param Unit keepAliveTime unit * * @param threadFactory threadFactory, Public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }Copy the code

ThreadPoolExecutor provides several implementation strategies for RejectedExecutionHandler:

  • DiscardOldestPolicy: Discard the oldest unprocessed task request
  • AbortPolicy(Default policy) : By throwingRejectedExecutionExceptionRefused to task
  • CallerRunsPolicyExecute: Directly runs the reject task in the execute thread. If the execute is stopped, the task is discarded
  • DiscardPolicy: Discard the new task

Adding an Execution Taskexecute()

public void execute(Runnable command) { if (command == null) throw new NullPointerException(); Int c = ctl.get(); < corePoolSize if (workerCountOf(c) < corePoolSize) {addWorker(command, true)) return; // c = ctl.get(); If (isRunning(c) &&workqueue.offer (command)) {int recheck = ctl.get(); // Remove tasks from the queue if (! IsRunning (recheck) && remove(command)) // Use a processing policy. Handler processes the task. Else if (workerCountOf(recheck) == 0) addWorker(null, false); } // join the queue failed, create a non-core thread to execute the task else if (! addWorker(command, false)) reject(command); } /** * add task to Worker * @param firstTask * @param core whether the new thread is bound to the core thread, true, the new thread belongs to corePoolSize thread, False maximumPoolSize */ private Boolean addWorker(Runnable firstTask, Boolean core) { for (;;) { int c = ctl.get(); int rs = runStateOf(c); If (rs >= SHUTDOWN &&! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); // If the task is executed using a core thread and the number of worker threads >= the number of core threads, return false; If the task is executed with a new thread and the number of workers >= the maximum number of allowed threads, return false; if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; / / CAS from work increases the number of threads works may jump out of the loop label the if (compareAndIncrementWorkerCount (c)) break retry. c = ctl.get(); // re-read CTL // re-check the thread pool state, and if it does not match the thread pool state at the beginning of the loop jump to the tag and re-loop if (runStateOf(c)! = rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t ! = null) { final ReentrantLock mainLock = this.mainLock; Mainlock. lock(); // Other threads may operate on the newly created worker w before locking. Int rs = runStateOf(ctl.get()); / / to determine whether a state of the thread pool to RUNNING < SHUTDOWN (rs) / / or thread pool SHUTDOWN state and new mission is empty if (rs < SHUTDOWN | | (rs = = SHUTDOWN && firstTask = = null)) {// Check whether the Thread maintained by the worker is alive // Since the worker is created in the current method, its Thread should not be started. Its maintenance Thread is alive, i.e. thread.start () method has been called by another Thread. Status is not reasonable if (t.i sAlive ()) / / precheck that t is startable throw new IllegalThreadStateException (); Add (w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; // workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; Public Boolean remove(Runnable task) {Boolean removed = workqueue.remove (task); tryTerminate(); // In case SHUTDOWN and now empty return removed; } private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L; /** The current Worker Thread */ final Thread Thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** The number of tasks completed by the thread */ Volatile long completedTasks; Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }... }Copy the code

The main flow chart of the following adding tasks can be drawn according to the above source code:

Submit () vs. execute()

  • submit()The submittedRunnableTasks are converted toCallableTo perform
  • execute()Method is used to submit tasks that do not require a return value, so there is no way to determine whether the task was successfully executed by the thread pool
  • submit()Method is used to submit tasks that require a return value, and the thread pool returns oneFutureObject that can be used to check whether a task is successfully executedFuture.get()Method to get the return value,get()Method blocks the current thread until the task completes

Partial public method resolution

  • int getActiveCount(): Returns the approximate number of threads executing tasks (the state of tasks and threads may change dynamically during computation)
  • int getLargestPoolSize(): Returns the maximum number of concurrent threads in the pool
  • long getTaskCount(): Returns the approximate total number of planned tasks
  • long getCompletedTaskCount(): Returns the approximate total number of completed executed tasks
  • int getPoolSize(): Returns the current number of threads in the pool
  • boolean isShutdown(): Tests whether the pool state is not RUNNING. The state is SHUTDOWN, STOP, TIDYING, or TERMINATED
  • void shutdown(): Starts an orderly SHUTDOWN of the thread pool, in which previously submitted tasks are executed but no new tasks are accepted, and the pool state is changed to SHUTDOWN
  • List<Runnable> shutdownNow(): Changes the pool state to STOP, attempts to STOP all executing tasks, pauses processing of waiting tasks, and returns the list of tasks waiting to be executed. The queue will be emptied of tasks
  • boolean isTerminating(): Returns true if the pool is in the SHUTDOWN state, indicating that the program is runningshutdown()orshutdownNow()The thread pool is then being terminated
  • boolean isTerminated(): Checks whether the pool state is TERMINATED

Source code excerpt:

public int getPoolSize() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); // If the thread pool is TERMINATED at TIDYING, 0 return runStateAtLeast(ctl.get(), TIDYING)? 0 : workers.size(); } finally { mainLock.unlock(); } } public int getActiveCount() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int n = 0; For (Worker w: workers) // if (w.islocked ()) ++n; return n; } finally { mainLock.unlock(); } } public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); CheckShutdownAccess (); checkShutdownAccess(); // CAS changes the pool state advanceRunState(SHUTDOWN); // interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); } private void advanceRunState(int targetState) { for (;;) { int c = ctl.get(); if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) break; Private void interruptIdleWorkers() {interruptIdleWorkers(false); } /** * @param onlyOne terminates onlyOne worker thread * interrupts workers thread */ private void interruptIdleWorkers(Boolean onlyOne) {final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (! t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); }}...Copy the code

As you can see from the above code, ThreadPoolExecutor locks mainLock every time it operates on workers.

Use the sample

Public class ThreadPoolTest {public static void main(String[] args) { ThreadFactory ThreadFactory = new ThreadFactoryBuilder().setNameFormat(" Thread -%s") // Abnormal during the implementation of the execute () will print abnormal thread and abnormal information. SetUncaughtExceptionHandler ((thread, throwable) -> { System.err.println("exception thread id:" + thread.getId() + ", thread name:" + thread.getName()); throwable.printStackTrace(); }) .build(); ThreadPoolExecutor executorService = new ThreadPoolExecutor(5, 10, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), threadFactory); List<Future<String>> futures = new ArrayList<>(); // The console might print "Thread pool full" intstream. range(0, 24) .forEach(i -> executorService.execute(() -> System.out.println(Thread.currentThread().getName()))); Intstream. range(0, 24) // .forEach(i -> futures.add(executorService.submit(() -> Thread.currentThread().getName()))); } the catch (RejectedExecutionException e) {System. Err. Println (" thread pool is full "); Executorservice.shutdown (); executorservice.shutdown (); executorservice.shutdown (); } // Thread pool state is not TERMINATED, spin blocks main thread while (! executorService.isTerminated()){ } for (Future<String> fut : futures) { try { System.out.println(new Date() + "::" + fut.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }}}}Copy the code

In the preceding example, the number of core threads in the pool is 5, the queue capacity is 10, and the maximum number of threads in the pool is 10. When the pool capacity is full and the number of threads reaches the maximum, The pool task number > 20 will use the default AbortPolicy strategy thrown RejectedExecutionException to reject the subsequent Runnable task. Since task processing efficiency and task overflow (4) are not large, the above example does not necessarily print “Thread pool full”. You can add Thread.sleep(100) to the Runnable task if you need to confirm. Since exceptions thrown by the processing policy can affect subsequent code execution, the thread pool needs to be closed in the finally block to ensure that resources in the thread pool are freed.

ExecutorsA thread pool that can be created

The Executors class provides thread pools for creating the following JUC packages:

  • FixedThreadPool: a reusable thread pool with a fixed number of threads
  • CachedThreadPool: Cache thread pool, cache threads by extending thread lifetime
  • ScheduledThreadPool: Pool of threads in which tasks can be deferred or executed periodically
  • WorkStealingPool: Task stealingthread pool, internal pool added in version 1.8ForkJoinPoolCreate method of

The above thread pools do not have corresponding classes of the same name. They are just the names of some special function thread pools, which are built based on the original JUC thread pool class by adding some parameter restrictions and queue selection.

FixedThreadPool

Reusability in FixedThreadPool refers to the reuse of threads, the number of threads in the pool is fixed, and the task queue can be treated as bounded (integer.max_value). The source code involved in creating the thread pool is as follows:

public class Executors { ...... public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }... } public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { ...... public LinkedBlockingQueue() { this(Integer.MAX_VALUE); }... }Copy the code

ThreadPoolExecutor created by the newFixedThreadPool() method

  • CorePoolSize =maximumPoolSize: There are only core threads in the thread pool, and no new threads are created to execute tasks even if the number of core threads is full and the queue is full
  • keepAliveTime=0A thread pool with a fixed number of threads that are all core threads does not have a lifetime
  • new LinkedBlockingQueue<Runnable>(): The queue is unbounded, which means that the queue will never be filled with tasks. Therefore, the core thread will not execute the task rejection policy because it cannot process tasks. The core thread will continuously obtain and execute tasks from the task queue. In addition, when the task pile up too much may cause memory OOM, throwOutOfMemoryError.

Set VM Options to -xms64m -XMx64m, the following program will throw OOM after running for a while (also available for CachedThreadPool).

ExecutorService executorService = Executors.newFixedThreadPool(1); LinkedList<String> hashList = new LinkedList<>(); Runnable task = () -> { byte[] KB = new byte[1024]; hashList.add(Thread.currentThread().getId() + ":" + Thread.currentThread().getName()); Thread.sleep(1000); thread.sleep (1000); } catch (InterruptedException e) { e.printStackTrace(); }}; boolean flag = true; while (flag) { executorService.execute(task); } // OOM cannot catch, so there is no finally shutdown thread poolCopy the code

CachedThreadPool

Cache thread pool is implemented by extending the thread pool thread survival time to achieve the purpose of thread cache, the thread pool creation involves the source code as follows:

public class Executors { ...... public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }... } public class SynchronousQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { ...... public SynchronousQueue() { this(false); Public SynchronousQueue(Boolean fair) {transferer = fair? new TransferQueue<E>() : new TransferStack<E>(); }... }Copy the code

ThreadPoolExecutor created by the newCachedThreadPool() method

  • corePoolSize=0: The number of core threads is 0, that is, all tasks are directly put into the queue and all threads executing the queue tasks are non-core threads
  • maximumPoolSize=Integer.MAX_VALUE: The maximum number of threads can be considered unbounded, meaning that as many threads can be created as many tasks are available
  • keepAliveTime=60s: The thread lifetime is 60 seconds, which delays the life cycle of the thread to reduce the negative impact of not having a core thread, and can improve the performance of programs that perform many asynchronous tasks in a short period of time
  • workQueue=SynchronousQueue(false): The task queue is an unfair blocking queue in which each insertion of a column must wait for a corresponding deletion by another thread

SynchronousQueue is a synchronous blocking queue in which each insert must wait for another thread to perform a corresponding delete, or it will block, and vice versa (no fetch thread will block until an element is inserted). The synchronous queue does not have any internal capacity, the head of the queue is the element that the first queued insertion thread tries to add to the queue, and if there is no such thread, poll() returns NULL. An execution task in CacheThreadPool flows as follows:

While the number of threads is unlimited to cope with sudden high concurrency, there is also a load ceiling. When too many threads are created, resulting in insufficient memory, an OutOfMemoryError will be thrown.

ScheduledThreadPool

Executors created ScheduledThreadPool corresponding class for ScheduledThreadPoolExecutor, involved to create the source code is as follows:

public class Executors { ...... public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }... } public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService { ...... Public ScheduledThreadPoolExecutor (int corePoolSize) {/ / call ThreadPoolExecutor constructor super (corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }}Copy the code

ScheduledThreadPoolExecutor structure parameter interpretation:

  • maximumPoolSize=Integer.MAX_VALUE: The maximum number of threads can be considered unbounded, meaning that as many threads can be created as many tasks are available
  • keepAliveTime=0sA thread pool with a fixed number of threads that are all core threads does not have a lifetime
  • workQueue=DelayedWorkQueue(): Delay queue,ScheduledThreadPoolExecutorAll constructors use this queue,RunnableTasks will be forced to be extendableRunnableScheduledFuture

DelayedWorkQueue is based on a heap-based data structure similar to DelayQueue and PriorityQueue, except that each ScheduledFutureTask also logs its index into the heap array, Task queues in the ScheduledThreadPool thread pool are sorted according to execution time. To make the relationship clearer, the following related class diagrams are drawn:

Because ScheduledThreadPoolExecutor realized ScheduledThreadPoolExecutor, more so than other thread pool the following task scheduling method:

  • ScheduledFuture<? > schedule(Runnable command, long delay, TimeUnit unit)Specifies the future returned after the task is delayedget()Is empty
  • ScheduledFuture<? > schedule(Callable command, long delay, TimeUnit unit)Again, the difference is the future returnedget()forCallableThe results of
  • ScheduledFuture<? > scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)After the delay is specified, tasks are executed periodically at a fixed interval. Tasks in the current period are executed regardless of whether tasks in the previous period are completed. That is, the interval between the start time of the previous task and the current task is alwaysdelay unit
  • ScheduledFuture<? > scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)The delay is specified based on the completion time of the taskdelayThe time interval between the end time of the previous task and the start time of the current task is alwaysdelay unit

ScheduleWithFixedDelay () and scheduleWithFixedDelay() both convert Runnable to Callable and encapsulate it into ScheduledFutureTask, and then throw it into the delay queue :

public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService { ...... / / (0) ScheduledThreadPoolExecutor task scheduling source of public ScheduledFuture <? > schedule(Runnable command, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<? > t = decorateTask(command, // New ScheduledFutureTask (Command, null, triggerTime(delay, unit))); // Submit the scheduling task delayedExecute(t); return t; } // Delay task execution private void delayedExecute(RunnableScheduledFuture<? > task) { if (isShutdown()) reject(task); Else {// (2) Add tasks to the delay queue super.getQueue().add(task); If (isShutdown() && // Whether the current running state can run the inserted task, for example, whether the delayed task is canceled after SHUTDOWN! CanRunInCurrentRunState (task.isPeriodic()) && // If the current state is not running, the task is removed from the queue. Remove (task)) // If the task is running, do not cancel task.Cancel (false); Else // The related state meets the execution requirements, prebuild thread ensurePrestart(); } } void ensurePrestart() { int wc = workerCountOf(ctl.get()); If (wc < corePoolSize) addWorker(null, true); Else if (wc == 0) addWorker(null, false); }... Static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {// Task storage queue private RunnableScheduledFuture<? >[] queue = new RunnableScheduledFuture<? >[INITIAL_CAPACITY]; . Public Boolean add(Runnable e) {return offer(e); } public Boolean offer(Runnable x) {if (x == null) throw new NullPointerException(); // Convert the Runnable task to RunnableScheduledFuture RunnableScheduledFuture<? > e = (RunnableScheduledFuture<? >)x; final ReentrantLock lock = this.lock; lock.lock(); try { int i = size; If (I >= queue.length) // Expand the queue. size = i + 1; If (I == 0) {queue[0] = e; setIndex(e, 0); } else {// (2.1) Insert tasks into the queue heap according to the task time at the appropriate location, closest to siftUp(I, e); } if (queue[0] == e) { leader = null; // (2.2) The current task is in the head of the queue, i.e. the task is an immediate task, and the wake thread available.signal(); } } finally { lock.unlock(); } return true; } private void siftUp(int k, RunnableScheduledFuture<? Int parent = (k-1) >>> 1; int parent = (k-1) >>> 1; RunnableScheduledFuture<? > e = queue[parent]; If (key.compareTo(e) >= 0) break; queue[k] = e; // Set the heap index of the task, setIndex(e, k); k = parent; } queue[k] = key; setIndex(key, k); }... } private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> { ...... ScheduledFutureTask(Runnable r, V result, long ns) { super(r, result); Callable (runnable, result) converts Runnable to callable this.time = ns; // Call the parent FutureTask constructor and run execnable. Callable (runnable, result). this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); } public int compareTo(Delayed other) {if (other == this) // compare zero if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<? > x = (ScheduledFutureTask<? >)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1:0; }... }}Copy the code

Comb over the source code, available SchduledThreadPoolExecutor add tasks of the conventional process is as follows:

  1. createScheduledFutureTaskSchedule a task instance that willRunnableThe task toCallable
  2. willScheduledFutureTaskInstance added to delay queueDealyedWorkQueueIn the

Workqueue.siftup () -> task.compareTo() Determine whether the related status meets the conditions to execute the new task

  • If the status is consistent and the task is removed from the queue successfully, the new task is cancelled if it is not being executed. If the new task is being executed, the task is not cancelledtask.cancel(false)
  • Other: pre-built threadsensurePrestart()

WorkStealingPool

-Chuck: I don’t know

The fork/ Join framework, proposed in Java 7, provides tools to help speed up parallel processing by trying to use all available processor cores, which is achieved through a divide-and-conquer approach. In the fork/ Join framework, tasks are recursively forked into smaller independent sub-tasks and executed asynchronously, and then the sub-results of execution are joined into a single result. The decomposed tasks are solved with worker threads provided by the thread pool. In Java, the fork/ Join framework is implemented using a thread pool called ForkJoinPool, which manages the worker thread class ForkJoinWorkerThread. The main difference between ForkJoinPool and other types of ExecutorService implementations is the use of work stealing: all threads in the pool attempt to find and execute tasks submitted to the pool or created by other active tasks (if there are no tasks, block waiting for work). Since the Runnable task cannot be split, a new abstract class, ForkJoinTask, has been added to Java 7. All separable tasks inherit from this class. Instances of Runnable tasks received by a ForkJoinPool that are not integrated with ForkJoinTask are encapsulated in the ForkJoinTask subclass RunnableExecuteAction for ForkJoinPool executable tasks. However, this task still cannot be fork/join. WorkStealingPool build and task execution in the JDK

public class Executors { ...... public static ExecutorService newWorkStealingPool(int parallelism) { return new ForkJoinPool (parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); }... } @sun.misc.Contended public class ForkJoinPool extends AbstractExecutorService { ...... public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, boolean asyncMode) { this(checkParallelism(parallelism), checkFactory(factory), handler, asyncMode ? FIFO_QUEUE : LIFO_QUEUE, "ForkJoinPool-" + nextPoolId() + "-worker-"); checkPermission(); } private ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, int mode, String workerNamePrefix) { this.workerNamePrefix = workerNamePrefix; this.factory = factory; this.ueh = handler; this.config = (parallelism & SMASK) | mode; long np = (long)(-parallelism); // offset ctl counts this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); } public void execute(Runnable task) { if (task == null) throw new NullPointerException(); ForkJoinTask<? > job; if (task instanceof ForkJoinTask<? >) // avoid re-wrap job = (ForkJoinTask<? >) task; The else / / task encapsulated into ForkJoinPool executable task job = new ForkJoinTask. RunnableExecuteAction (task); externalPush(job); }... // Adapter for Runnable static Final class RunnableExecuteAction extends ForkJoinTask<Void> {final Runnable Runnable; RunnableExecuteAction(Runnable runnable) { if (runnable == null) throw new NullPointerException(); this.runnable = runnable; } public final Void getRawResult() { return null; } public final void setRawResult(void v) {fork/join public final Boolean exec() {runnable.run(); return true; } void internalPropagateException(Throwable ex) { rethrow(ex); // rethrow outside exec() catches. } private static final long serialVersionUID = 5232453952276885070L; }... }Copy the code

This thread pool is a lot of stuff, not much use, so forget it. I wish I knew, but click on this

Extension: Thread pool in Spring Boot Quick configuration

There are two task schedulers that encapsulate ThreadPoolTaskExecutor, ThreadPoolTaskScheduler, and spring-context. Both task actuators can be in the Spring of the Boot configuration file for the thread pool configuration to generate corresponding singleton thread pool, in some low requirements for task scheduling scenarios to use is very convenient, in the project of the same type tasks (such as CPU intensive tasks, I/O intensive tasks) using the same thread thread pool is also convenient to maintain.

org.springframework.scheduling.concurrent.ThreadPoolTaskExecutorThe use of

The ThreadPoolExecutor instance that ThreadPoolTaskExecutor maintains internally is ThreadPoolExecutor, Both FixedThreadPool and CachedThreadPool are implemented by changing the parameters of ThreadPoolExecutor, so you can also configure the corresponding thread pool in the Spring Boot configuration file. ThreadPoolTaskExecutor ThreadPoolTaskExecutor ThreadPoolTaskExecutor

public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {// SchedulingTaskExecutor {// SchedulingTaskExecutor = new Object(); private int corePoolSize = 1; private int maxPoolSize = Integer.MAX_VALUE; private int keepAliveSeconds = 60; private int queueCapacity = Integer.MAX_VALUE; private boolean allowCoreThreadTimeOut = false; @Nullable private TaskDecorator taskDecorator; @Nullable private ThreadPoolExecutor threadPoolExecutor; // Change the core number of the thread pool synchronously to avoid concurrent configuration changes. CorePoolSize public void setCorePoolSize(int corePoolSize) {synchronized (this.poolsizemonitor) {synchronized (this.poolsizemonitor) {synchronized (this.poolsizemonitor) {synchronized (this.poolsizemonitor) { this.corePoolSize = corePoolSize; if (this.threadPoolExecutor ! = null) { this.threadPoolExecutor.setCorePoolSize(corePoolSize); ExecutorService initializeExecutor(ThreadFactory ThreadFactory, RejectedExecutionHandler RejectedExecutionHandler) {// Create queue BlockingQueue<Runnable> queue = createQueue(this.queueCapacity); ThreadPoolExecutor executor; . return executor; }... SynchronousQueue */ protected BlockingQueue<Runnable> createQueue(int queueCapacity) { if (queueCapacity > 0) { return new LinkedBlockingQueue<>(queueCapacity); } else { return new SynchronousQueue<>(); }}}Copy the code

Sample application.yml file configuration (mostly defaults) :

Spring: task: execution: pool: SynchronousQueue = 0; SynchronousQueue = 0; queue capacity = 60; keep-alive = 0; 60s # Allow-core-thread-timeout: trueCopy the code

Injection usage test:

    @Resource
    private ThreadPoolTaskExecutor executor;

    @PostConstruct
    public void init(){
        executor.execute(() -> System.out.println("ThreadPoolTaskExecutor execute task"));
    }
Copy the code

org.springframework.scheduling.concurrent.ThreadPoolTaskSchedulerThe use of

The main difference between ThreadPoolTaskScheduler and ThreadPoolTaskExecutor is the maintenance of different ExecutorService, ThreadPoolTaskScheduler scheduler scheduler is configured for scheduledExecutor, and the @enablesCheduling annotation must be added to the application class.

public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport implements AsyncListenableTaskExecutor, SchedulingTaskExecutor, TaskScheduler { private volatile int poolSize = 1; / / if ScheduledExecutorServiced task ScheduledFutureTask. Cancel the deleted from the queue () task, it is True, false, the default ScheduledExecutorService Private Volatile Boolean removeOnCancelPolicy = false; / / scheduling task execution of exception handler, empty. Use the default processor TaskUtils getDefaultErrorHandler (true) @ Nullable private volatile ErrorHandler ErrorHandler. @Nullable private ScheduledExecutorService scheduledExecutor; . ScheduledExecutor @override protected ExecutorService initializeExecutor(ThreadFactory ThreadFactory, RejectedExecutionHandler rejectedExecutionHandler) { this.scheduledExecutor = createExecutor(this.poolSize, threadFactory, rejectedExecutionHandler); if (this.removeOnCancelPolicy) { if (this.scheduledExecutor instanceof ScheduledThreadPoolExecutor) { ((ScheduledThreadPoolExecutor) this.scheduledExecutor).setRemoveOnCancelPolicy(true); } else { logger.debug("Could not apply remove-on-cancel policy - not a ScheduledThreadPoolExecutor"); } } return this.scheduledExecutor; } protected ScheduledExecutorService createExecutor( int poolSize, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) { return new ScheduledThreadPoolExecutor(poolSize, threadFactory, rejectedExecutionHandler); }}Copy the code

Sample application.yml file configuration (mostly defaults) :

Spring: task: # Execution: pool execution: SynchronousQueue = 0; SynchronousQueue = 0; queue capacity = 60; keep-alive = 0; 60s # Allow-core-thread-timeout: true # scheduling: Scheduling "thread-name prefix: scheduled task-pool: size: 5 #shutdown: scheduling: scheduled task-pool: scheduling: scheduled False #await-termination: False # The maximum time to wait for the completion of remaining tasks. The default value is null, that is, whether the last task is completed does not affect the execution of the next taskCopy the code

Injection ThreadPoolTaskScheduler application must add @ EnableScheduling annotations, configuration class TaskSchedulingAutoConfiguration won’t be able to generate TaskScheduler cause the program will not be able to start, Examples are as follows:

@SpringBootApplication @EnableScheduling public class SimpleWebApplication { @Resource private ThreadPoolTaskExecutor executor; @Resource private ThreadPoolTaskScheduler scheduler; @postconstruct public void init() {// Print the current time every 5 seconds, Parameters and ScheduledThreadPoolExecutor a little difference but not scheduler. ScheduleAtFixedRate (() - > System. Out. The println (LocalDateTime. Now ()), Duration.of(5, ChronoUnit.SECONDS)); } public static void main(String[] args) { SpringApplication.run(SimpleWebApplication.class, args); }}Copy the code

Thread pool size

Too many threads in the thread pool add context switching costs, and too few threads underuse CPU resources. There is a common formula for setting the size of the thread pool according to the task type of the thread, as follows (N is the number of CPU cores) :

  • Cpu-intensive tasks (N+1) : If all tasks in the thread pool require CPU resources to perform operations, set the number of threads toN+1One more thread than the number of CPU cores is used to prevent occasional interrupts of the thread, or the effect of some other reason to pause the task, leaving the CPU idle and underutilized.
  • I/ O-intensive task (2N) : The performance bottleneck of an I/O task is mainly on THE I/O task, which consumes very few CPU resources. The BLOCKING of THE I/O task occupies most of the task time, and the thread does not occupy CPU for processing the I/O. In this case, the CP can be handed over to another thread. Therefore, you can configure more threads in the THREAD pool for I/O intensive tasks2N.

In Java, runtime.getruntime ().availableProcessors() is used to obtain the number of CPU cores.

Context Switch

Context switching (sometimes called process switching or task switching) is when the CPU switches from one process or thread to another. CPU process scheduling algorithm for time, time slices for each thread distribution, when a thread of time slices after use must be the end of the mission or not yield the CPU utilization, before release to the information in the thread (such as variable, execution to lines of code) to save the thread private area (such as stack, the program counter), So that the thread task can continue to run normally after the next time slice is obtained. Context switching is usually computationally intensive and requires considerable processor time. In tens or hundreds of switches per second, each switch takes nanoseconds and requires thread information to be saved. Therefore, the number of threads in the CPU task-intensive thread pool will not exceed the number of cores by much.

Task type

  • CPU intensive: Computes data in memory using the COMPUTER power of the CPU, such as sorting lists and searching for maximum and minimum values
  • I/O intensive: Most of the time is spent dealing with I/O interactions, such as Socket connections, file reads and writes, etc

At the end

Simple closing tag