This article is the way to learn asynchronous programming (5) – thread pool principle and use, to pay attention to the previous, please click portal:

Path to learning asynchronous programming (4) – Sleep, Wake up, compromise, merge

In this article, we will further explain the principle and use of thread pools.

I don’t recommend using Executors to create a thread pool. * If you do, go ahead and do the following. As for the reasons why direct use is not recommended, you will be able to understand after reading this article.

ThreadPoolExcetor source code analysis

ThreadPoolExcetor constructs

ThreadPoolExcetor has four constructors in Jdk8, each with a different parameter as follows:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
	super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
	super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
	super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new ThreadPoolExecutor.RejectHandler());
}

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
	super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new ThreadPoolExecutor.RejectHandler());
}
Copy the code

These parameters are explained in detail below.

CorePoolSize (number of core threads in the thread pool)

When the task comes, determine whether the number of threads currently executing reaches the core thread number. If so, enter the cache queue; otherwise, create a new thread to execute the task.

3, maximumPoolSize (maximum number of threads a thread pool can tolerate)

If the number of core threads is greater than the number of core threads, then determine whether the number of core threads is greater than the maximum number of threads. If the number of core threads is greater than the maximum number of threads, reject the policy.

KeepAliveTime (thread lifetime)

KeepAliveTime indicates the maximum amount of time a thread can hold without executing a task before terminating. By default, keepAliveTime works only when the number of threads in the thread pool is greater than corePoolSize, until the number of threads in the thread pool is no greater than corePoolSize. If a thread is idle for a keepAliveTime, it terminates until the number of threads in the thread pool does not exceed corePoolSize. But if the allowCoreThreadTimeOut(Boolean) method is called, the keepAliveTime parameter will also work until the number of threads in the pool is zero if the number of threads in the pool is not greater than corePoolSize.

There are two places to set the thread lifetime. One is passed in the constructor parameter when ThreadPoolExcetor is initialized, and the other is to call the ThreadPoolExcetor method to set the thread lifetime as follows:

/**
 * Sets the time limit for which threads may remain idle before
 * being terminated.  If there are more than the core number of
 * threads currently in the pool, after waiting this amount of
 * time without processing a task, excess threads will be
 * terminated.  This overrides any value set in the constructor.
 *
 * @param time the time to wait.  A time value of zero will cause
 *        excess threads to terminate immediately after executing tasks.
 * @param unit the time unit of the {@code time} argument
 * @throws IllegalArgumentException if {@code time} less than zero or
 *         if {@code time} is zero and {@code allowsCoreThreadTimeOut}
 * @see #getKeepAliveTime(TimeUnit)
 */
public void setKeepAliveTime(long time, TimeUnit unit) {
	if (time < 0)
		throw new IllegalArgumentException();
	if (time == 0 && allowsCoreThreadTimeOut())
		throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
	long keepAliveTime = unit.toNanos(time);
	long delta = keepAliveTime - this.keepAliveTime;
	this.keepAliveTime = keepAliveTime;
	if (delta < 0)
		interruptIdleWorkers();
}
Copy the code

The setKeepAliveTime method takes two arguments, a timeout and a unit of time. If time < 0 or time == 0 && allowsCoreThreadTimeOut() throws an exception, the code looks like this:

/**
 * Returns true if this pool allows core threads to time out and
 * terminate if no tasks arrive within the keepAlive time, being
 * replaced if needed when new tasks arrive. When true, the same
 * keep-alive policy applying to non-core threads applies also to
 * core threads. When false (the default), core threads are never
 * terminated due to lack of incoming tasks.
 *
 * @return {@code true} if core threads are allowed to time out,
 *         else {@code false}
 *
 * @since1.6 * /
public boolean allowsCoreThreadTimeOut(a) {
	return allowCoreThreadTimeOut;
}
Copy the code

AllowCoreThreadTimeOut () returns true if the current thread pool allows core threads to expire or if no tasks arrive during keepAlive time.

AllowCoreThreadTimeOut this method directly returns allowCoreThreadTimeOut, which can be set in the allowsCoreThreadTimeOut() overloaded method as follows:

/**
 * Sets the policy governing whether core threads may time out and
 * terminate if no tasks arrive within the keep-alive time, being
 * replaced if needed when new tasks arrive. When false, core
 * threads are never terminated due to lack of incoming
 * tasks. When true, the same keep-alive policy applying to
 * non-core threads applies also to core threads. To avoid
 * continual thread replacement, the keep-alive time must be
 * greater than zero when setting {@code true}. This method
 * should in general be called before the pool is actively used.
 *
 * @param value {@code true} if should time out, else {@code false}
 * @throws IllegalArgumentException if value is {@code true}
 *         and the current keep-alive time is not greater than zero
 *
 * @since1.6 * /
public void allowCoreThreadTimeOut(boolean value) {
	if (value && keepAliveTime <= 0)
		throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
	if(value ! = allowCoreThreadTimeOut) { allowCoreThreadTimeOut = value;if(value) interruptIdleWorkers(); }}Copy the code

AllowCoreThreadTimeout: The core thread does not exit by default. You can make the core thread exit by setting this parameter to true.

By default allowCoreThreadTimeOut is false, core threads will in principle always exist and will not be collected by GC. If allowCoreThreadTimeOut is set to true, core threads will also be collected by GC. The code for allowCoreThreadTimeOut is as follows:

/** * * Performs blocking or timed wait for a task, depending on * current configuration settings, or returns null if this worker * must exit because of any of: * 1. There are more than maximumPoolSize workers (due to * a call to setMaximumPoolSize). * 2. The pool is stopped. * 3.  The pool is shutdown and the queue is empty. * 4. This worker timed out waiting for a task, and timed-out * workers are subject to termination (that is, * {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
 *    both before and after the timed wait, and if the queue is
 *    non-empty, this worker is not the last thread in the pool.
 *
 * @return task, or null if the worker must exit, in which case
 *         workerCount is decremented
 */
private Runnable getTask(a) {
	boolean timedOut = false; // Did the last poll() time out?

	for (;;) {
		int c = ctl.get();
		int rs = runStateOf(c);

		// Check if queue empty only if necessary.
		if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
			decrementWorkerCount();
			return null;
		}

		int wc = workerCountOf(c);

		// Are workers subject to culling?
		boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

		if ((wc > maximumPoolSize || (timed && timedOut))
			&& (wc > 1 || workQueue.isEmpty())) {
			if (compareAndDecrementWorkerCount(c))
				return null;
			continue;
		}

		try {
			Runnable r = timed ?
				workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
				workQueue.take();
			if(r ! =null)
				return r;
			timedOut = true;
		} catch (InterruptedException retry) {
			timedOut = false; }}}Copy the code

Execute a blocking or timed wait task based on the current configuration setting, exit and return NULL if the worker thread has any of the following reasons:

1. The number of worker threads exceeds the maximum number of thread pool threads (because the maximum number of thread pool threads is set).

2. The thread pool has stopped.

3, Close thread pool &&queue is empty.

4. If the thread timed out while waiting for a task, the thread that timed out will be terminated. (allowCoreThreadTimeOut | | workerCount > corePoolSize)

One more thing to note is this code:

boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
    if (compareAndDecrementWorkerCount(c))
				return null;
    continue;
}
Copy the code

That is in the ((current working threads > maximum number of threads) | | (current queue is not empty && (allowing core thread timeout | | current working threads > core thread)) && (current working threads > 1 | | the queue is empty) when the GC thread will be recycled.

5, Unit (Survival time)

The keepAliveTime parameter is a unit of time. There are 7 values for keepAliveTime. There are 7 static properties in TimeUnit:

TimeUnit.DAYS;               / / day
TimeUnit.HOURS;             / / hour
TimeUnit.MINUTES;           / / minute
TimeUnit.SECONDS;           / / SEC.
TimeUnit.MILLISECONDS;      / / ms
TimeUnit.MICROSECONDS;      / / subtle
TimeUnit.NANOSECONDS;       / / nanoseconds
Copy the code

6. WorkQueue

WorkQueue is used to store tasks that are waiting to be executed. There are three options (ArrayBlockingQueue, LinkedBlockingQueue, and SynchronousQueue) with the following initialization parameters:

/**
 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
 * capacity and default access policy.
 *
 * @param capacity the capacity of this queue
 * @throws IllegalArgumentException if {@code capacity < 1}
 */
public ArrayBlockingQueue(int capacity) {
	this(capacity, false);
}

/**
 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
 * capacity and the specified access policy.
 *
 * @param capacity the capacity of this queue
 * @param fair if {@code true} then queue accesses for threads blocked
 *        on insertion or removal, are processed in FIFO order;
 *        if {@code false} the access order is unspecified.
 * @throws IllegalArgumentException if {@code capacity < 1}
 */
public ArrayBlockingQueue(int capacity, boolean fair) {
	if (capacity <= 0)
		throw new IllegalArgumentException();
	this.items = new Object[capacity];
	lock = new ReentrantLock(fair);
	notEmpty = lock.newCondition();
	notFull =  lock.newCondition();
}

/**
 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
 * capacity, the specified access policy and initially containing the
 * elements of the given collection,
 * added in traversal order of the collection's iterator.
 *
 * @param capacity the capacity of this queue
 * @param fair if {@code true} then queue accesses for threads blocked
 *        on insertion or removal, are processed in FIFO order;
 *        if {@code false} the access order is unspecified.
 * @param c the collection of elements to initially contain
 * @throws IllegalArgumentException if {@code capacity} is less than
 *         {@code c.size()}, or less than 1.
 * @throws NullPointerException if the specified collection or any
 *         of its elements are null
 */
public ArrayBlockingQueue(int capacity, boolean fair,
						  Collection<? extends E> c) {
	this(capacity, fair);

	final ReentrantLock lock = this.lock;
	lock.lock(); // Lock only for visibility, not mutual exclusion
	try {
		int i = 0;
		try {
			for(E e : c) { checkNotNull(e); items[i++] = e; }}catch (ArrayIndexOutOfBoundsException ex) {
			throw new IllegalArgumentException();
		}
		count = i;
		putIndex = (i == capacity) ? 0 : i;
	} finally{ lock.unlock(); }}Copy the code

The ArrayBlockingQueue data structure is an array, and there are three ways to construct it. We usually use the first way.

1. Capacity: indicates the array length.

Fair: The default value is false. If true, queue access is FIFO order for threads that block when inserting or deleting. If false, queue access is unknown.

C: A collection of elements to initially include. The following items are objects.

/**
 * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
 *
 * @param capacity the capacity of this queue
 * @throws IllegalArgumentException if {@code capacity} is not greater
 *         than zero
 */
public LinkedBlockingQueue(int capacity) {
	if (capacity <= 0) throw new IllegalArgumentException();
	this.capacity = capacity;
	last = head = new Node<E>(null);
}
Copy the code

The LinkedBlockingQueue data structure is a linked list, and there is only one way to construct it, taking the list length as an argument.

/**
 * Creates a {@code SynchronousQueue} with nonfair access policy.
 */
public SynchronousQueue(a) {
	this(false);
}

/**
 * Creates a {@code SynchronousQueue} with the specified fairness policy.
 *
 * @param fair if true, waiting threads contend in FIFO order for
 *        access; otherwise the order is unspecified.
 */
public SynchronousQueue(boolean fair) {
	transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
Copy the code

SynchronousQueue has two construction modes. Parameter meanings:

Fair: The default is false. If true, queue access for threads that block when inserting or deleting is processed in FIFO order, or if false, the order is unknown.

ArrayBlockingQueue and PriorityBlockingQueue are less commonly used, while LinkedBlockingQueue and Synchronous are commonly used. The queue policy of the thread pool is related to BlockingQueue.

ThreadFactory (a factory used to create threads)

ThreadFactory is a simple ThreadFactory that produces threads.

public interface ThreadFactory {

    /**
     * Constructs a new {@code Thread}.  Implementations may also initialize
     * priority, name, daemon status, {@code ThreadGroup}, etc.
     *
     * @param r a runnable to be executed by new thread instance
     * @return constructed thread, or {@code null} if the request to
     *         create a thread is rejected
     */
    Thread newThread(Runnable r);
}
Copy the code

The newThread method is used to produce threads, and subclasses need to implement this method to produce threads according to their rules.

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
}
Copy the code

If the ThreadPoolExecutor structure without incoming threadFactory, default Executors. DefaultThreadFactory () as a thread factory.

8. Handler (Task rejection Policy)

There are four commonly used task rejection policies. The uppermost interface code is as follows:

/**
 * A handler for tasks that cannot be executed by a {@link ThreadPoolExecutor}.
 *
 * @since 1.5
 * @author Doug Lea
 */
public interface RejectedExecutionHandler {

    /**
     * Method that may be invoked by a {@link ThreadPoolExecutor} when
     * {@link ThreadPoolExecutor#execute execute} cannot accept a
     * task.  This may occur when no more threads or queue slots are
     * available because their bounds would be exceeded, or upon
     * shutdown of the Executor.
     *
     * <p>In the absence of other alternatives, the method may throw
     * an unchecked {@link RejectedExecutionException}, which will be
     * propagated to the caller of {@code execute}.
     *
     * @param r the runnable task requested to be executed
     * @param executor the executor attempting to execute this task
     * @throws RejectedExecutionException if there is no remedy
     */
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
Copy the code

There are four types of implementation classes in Java:

ThreadPoolExecutor. AbortPolicy: discard task and throw RejectedExecutionException anomalies. ThreadPoolExecutor. DiscardPolicy: discard task too, but I don't throw an exception. ThreadPoolExecutor. DiscardOldestPolicy: discard queue in front of the task, and then to try to perform a task (repeat) ThreadPoolExecutor. CallerRunsPolicy: handle the tasks by the calling threadCopy the code

Developers have the flexibility to configure different rejection policies based on business requirements when initializing the thread pool.

Do not use Executors directly. * If the following closure-blocking methods are true: * If the following closure-blocking methods (newCachedThreadPool, newFixedThreadPool, newScheduledThreadPool, newSingleThreadExecutor) block the queue by default: Integer MAX_VALUE (infinite), the problem is that threads exceeding the core number of threads will continue to be placed in the blocking queue (if the maximum number of core threads is set improperly). To avoid this risk, it is strongly recommended to customize the blocking queue size when initializing ThreadPoolExcetor.

Second,Executors thread pool

There are four common thread pools used by Executors. The following uses examples to explain how to use them. The code is as follows:

1、 FixedThreadPool

import java.util.concurrent.ExecutorService;  
import java.util.concurrent.Executors;  

public class ThreadPoolTest {  
    public static void main(String[] args) {  
        ExecutorService threadPool = Executors.newFixedThreadPool(3);  
        for(int i = 1; i < 5; i++) {  
            final int taskID = i;  
            threadPool.execute(new Runnable() {  
                public void run(a) {  
                    for(int i = 1; i < 5; i++) {  
                        try {  
                            Thread.sleep(20);// To test the effect, make each task take a certain amount of time
                        } catch (InterruptedException e) {  
                            e.printStackTrace();  
                        }  
                        System.out.println("The first" + taskID + "The first of the missions." + i + "Secondary execution"); }}}); } threadPool.shutdown();// Close the thread pool when the task is complete}}Copy the code

Running results:

The first1Number one on this mission1Time to perform the first2Number one on this mission1Time to perform the first3Number one on this mission1Time to perform the first2Number one on this mission2Time to perform the first3Number one on this mission2Time to perform the first1Number one on this mission2Time to perform the first3Number one on this mission3Time to perform the first1Number one on this mission3Time to perform the first2Number one on this mission3Time to perform the first3Number one on this mission4Time to perform the first2Number one on this mission4Time to perform the first1Number one on this mission4Time to perform the first4Number one on this mission1Time to perform the first4Number one on this mission2Time to perform the first4Number one on this mission3Time to perform the first4Number one on this mission4Time to performCopy the code

In the previous code, we created a fixed size thread pool with capacity 3, and then looped through 4 tasks. As we can see from the output, the first 3 tasks were executed first, and then the free threads were used to execute the fourth task. In the FixedThreadPool, there is a fixed size pool. If the number of tasks that need to be executed exceeds the pool size, the excess tasks wait until a free thread executes them, and when the number of tasks that need to be executed is smaller than the pool size, the free thread does not destroy them.

2、 CachedThreadPool

Otherwise unchanged, replace the newFixedThreadPool method with the newCachedThreadPool method.

Running results:

The first3Number one on this mission1Time to perform the first4Number one on this mission1Time to perform the first1Number one on this mission1Time to perform the first2Number one on this mission1Time to perform the first4Number one on this mission2Time to perform the first3Number one on this mission2Time to perform the first2Number one on this mission2Time to perform the first1Number one on this mission2Time to perform the first2Number one on this mission3Time to perform the first3Number one on this mission3Time to perform the first1Number one on this mission3Time to perform the first4Number one on this mission3Time to perform the first2Number one on this mission4Time to perform the first4Number one on this mission4Time to perform the first3Number one on this mission4Time to perform the first1Number one on this mission4Time to performCopy the code

As you can see, the four tasks are executed alternately. CachedThreadPool creates a cache, caches the initialized thread, uses the previously created thread if any is available, and creates a new thread if none is available, terminates and removes the thread that has not been used for 60 seconds from the cache.

3、 SingleThreadExecutor    

Otherwise unchanged, change the newFixedThreadPool method to newSingleThreadExecutor.

Running results:

The first1Number one on this mission1Time to perform the first1Number one on this mission2Time to perform the first1Number one on this mission3Time to perform the first1Number one on this mission4Time to perform the first2Number one on this mission1Time to perform the first2Number one on this mission2Time to perform the first2Number one on this mission3Time to perform the first2Number one on this mission4Time to perform the first3Number one on this mission1Time to perform the first3Number one on this mission2Time to perform the first3Number one on this mission3Time to perform the first3Number one on this mission4Time to perform the first4Number one on this mission1Time to perform the first4Number one on this mission2Time to perform the first4Number one on this mission3Time to perform the first4Number one on this mission4Time to performCopy the code

The four tasks are executed sequentially. SingleThreadExecutor gets a single thread that will ensure that your task is completed. If the current thread terminates unexpectedly, a new thread will be created to continue the task, which is different from creating a thread directly. Also different from newFixedThreadPool(1).

4, ScheduledThreadPool

import java.util.concurrent.ScheduledExecutorService;  
import java.util.concurrent.TimeUnit;  

public class ThreadPoolTest {  
    public static void main(String[] args) {  
        ScheduledExecutorService schedulePool = Executors.newScheduledThreadPool(1);  
        // The task will be executed in 5 seconds
        schedulePool.schedule(new Runnable() {  
            public void run(a) {  
                System.out.println("Bang"); }},5, TimeUnit.SECONDS);  
        // The task will be executed after 5 seconds and every 2 seconds thereafter
        schedulePool.scheduleAtFixedRate(new Runnable() {  
            @Override  
            public void run(a) {  
                System.out.println("Bang"); }},5.2, TimeUnit.SECONDS); }}Copy the code

ScheduledThreadPool can execute tasks on a scheduled or delayed basis.

This is the end of this article, and the following articles will start to learn the use of Feature and Callable.

Learning asynchronous Programming (6) – Principles and Usage of Future and Callable (including FutureTask)