Keywords:

Threads, thread pools, single threads, multithreading, benefits of thread pools, thread recycling, creation methods, core parameters, underlying mechanisms, rejection policies, parameter Settings, dynamic monitoring, thread isolation

Threads and thread pool related knowledge, is the Java knowledge, learning or interview will meet this we will from the threads and processes, parallel and concurrent, single-threaded and multithreading, etc., have been explained to the thread pool, the benefits of the thread pool, creation method, the core of the important parameters, several important methods, the underlying implementation, refused to strategy, parameter setting, dynamic adjustment, Thread isolation and so on. The main outline is as follows:

Benefits of thread pools

Thread pool, using the idea of pooling to manage threads, pooling technology is to maximize benefits, minimize user risks, the idea of unified management of resources. This idea is used in many places, not just computers, such as finance, enterprise management, equipment management, etc.

Why thread pools? If in a concurrent scenario, coders create thread pools based on requirements, they may have the following problems:

  • It is difficult to determine how many threads are running in the system, and the cost of creating and destroying threads is also high
  • Let’s say a lot of requests come in, maybe a crawler, creating threads like crazy, maybe running out of system resources.

What are the benefits of implementing thread pools?

  • Reduced resource consumption: Pooling techniques can reuse threads that have already been created, reducing thread creation and destruction costs.
  • Improved response time: Use existing threads for processing, reducing thread creation time
  • Management thread control: Threads are scarce resources and cannot be created indefinitely. Thread pools can be uniformly allocated and monitored
  • Expand other features: such as timed thread pools, which allow timed tasks to be executed

In fact, pooling technology is used in many places, such as:

  • Database connection pool: Database connections are scarce resources. They should be created first to improve response speed and reuse existing connections
  • Instance pool: first create a good object into the pool, recycling, reduce back and forth to create and destroy the consumption

Thread pool-related classes

The following is the inheritance of thread pool-related classes:

Executor

Executor is a top-level interface that has only one method, Execute (Runnable Command). It defines the thread pool to execute tasks. It defines the basic specification of the thread pool.

ExecutorService

ExecutorService inherits Executor, but it is still an interface, with additional methods:

  • void shutdown(): Closes the thread pool and waits for the task to complete.
  • List<Runnable> shutdownNow(): Immediately close the thread pool, try to stop all active tasks, stop processing of waiting tasks, andReturns a list of tasks that are waiting to be executed (that have not yet been executed).
  • boolean isShutdown(): Determines whether the thread pool is closed, but the thread may still be executing.
  • boolean isTerminated(): After shutdown/shutdownNow is executed, all tasks are completed and the state is true.
  • boolean awaitTermination(long timeout, TimeUnit unit): After shutdown, blocking waits in terminated state unless terminated or interrupted.
  • <T> Future<T> submit(Callable<T> task)Call future.get() to return the result when the task is completed.
  • <T> Future<T> submit(Runnable task, T result): submits a task that returns a result that does nothing but specify the type and return a result.
  • Future<? > submit(Runnable task): Submits the task and returns the Future
  • <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks): Perform tasks in batches to obtain the list of Future and submit tasks in batches.
  • <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit): Submits tasks in batches and specifies the timeout period
  • <T> T invokeAny(Collection<? extends Callable<T>> tasks): block, get the result value of the first completed task,
  • <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit): blocks, gets the value of the first completion result, and specifies the timeout


Future

submit(Runnable task, T result)

It does nothing but hold it, call future.get () to return the result, result new, Inside the Runnable wrapper class RunnableAdapter, there is no special processing for result, which is returned when the call() method is called. (Executors specific implementation)

    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call(a) {
            task.run();
            // Returns the result passed in
            returnresult; }}Copy the code

Another method worth mentioning is invokeAny(): Use the ExecutorService method invokeAny() in ThreadPoolExecutor to get the result of the first completed task, and when the first task is completed, interrupt() is called to interrupt the other tasks.

Note that ExecutorService is an interface, full of definitions, not implementations, and the previous tutorial is based on its name (the prescribed specification) and its common implementation.

As you can see, the ExecutorService defines several operations for the thread pool, including closing, deciding whether to close, stopping, submitting tasks, and batch submitting tasks.

AbstractExecutorService

AbstractExecutorService is an abstract class that implements the ExecutorService interface, which is the basic implementation of most ExecutorService thread pools. The main methods are as follows:

It not only implements methods such as Submit, invokeAll, invokeAny, but also provides a newTaskFor method for building RunnableFuture objects. Those objects that can get the result of the task are obtained through newTaskFor. Instead of expanding all the source code, submit() method is used as an example:

    publicFuture<? > submit(Runnable task) {if (task == null) throw new NullPointerException();
        // Encapsulate tasks
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        // Execute the task
        execute(ftask);
        // Return the RunnableFuture object
        return ftask;
    }
Copy the code

However, AbstractExecutorService does not implement the single most important method, the execute() method. The thread pool can be implemented in a variety of ways, such as AbstractExecutorService. The most common thread pool is ThreadPoolExecutor.

ThreadPoolExecutor

Here we go!! ThreadPoolExecutor is a common thread pool class that creates a thread pool or, if not a timed thread pool, uses it.

Let’s look at the internal structure (attributes) of ThreadPoolExecutor:

public class ThreadPoolExecutor extends AbstractExecutorService {
    // State control, mainly used to control the state of the thread pool, is the core traversal, using atomic classes
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  	// The number of bits used to represent the number of threads (using bit operations, one part represents the number of threads, one part represents the state of the thread pool)
    // SIZE = 32 32 bits, so COUNT_BITS is 29 bits
    private static final int COUNT_BITS = Integer.SIZE - 3;
  	// The capacity of the thread pool, which is the maximum of 27 bits
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // State quantity, stored in the high, the first three of 32 bits
  	// 111 (the first bit is a sign bit, and 1 represents a negative number), the thread pool is running
    private static final int RUNNING    = -1 << COUNT_BITS; 
  	/ / 000
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
  	/ / 001
    private static final int STOP       =  1 << COUNT_BITS;
  	/ / 010
    private static final int TIDYING    =  2 << COUNT_BITS;
  	/ / 011
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Retrieve the running state
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
  	// Fetch the number of threads
    private static int workerCountOf(int c)  { return c & CAPACITY; }
  	// Get the CTL with the running state and thread count
    private static int ctlOf(int rs, int wc) { return rs | wc; }
  	
  	// Task waiting queue
    private final BlockingQueue<Runnable> workQueue;
  	// Remaster lock (thread-safe for some operations)
    private final ReentrantLock mainLock = new ReentrantLock();
  	// A collection of threads
    private final HashSet<Worker> workers = new HashSet<Worker>();
  
  	// In Condition, replace wait() with await(), notify() with signal(), and notifyAll() with signalAll(),
    Condition can communicate with each other in the same way as a traditional thread. A Condition is powerful in that it can create different conditions for multiple threads
    private final Condition termination = mainLock.newCondition();
  
  	// Maximum thread pool size
    private int largestPoolSize;
  	// The number of tasks completed
    private long completedTaskCount;
  	// Thread factory
    private volatile ThreadFactory threadFactory;
  	// Task rejection handler
    private volatile RejectedExecutionHandler handler;
 		// The lifetime of non-core threads
    private volatile long keepAliveTime;
  	// The timeout allowed for the core thread
    private volatile boolean allowCoreThreadTimeOut;
 		// Number of core threads
    private volatile int corePoolSize;
		// The maximum capacity of the worker thread
    private volatile int maximumPoolSize;
 		// Default reject handler (discard task)
  	private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();
  	// Close permissions at runtime
    private static final RuntimePermission shutdownPerm =
        new RuntimePermission("modifyThread");
  	/ / context
    private final AccessControlContext acc;
  	// There is only one thread
    private static final boolean ONLY_ONE = true;
}
Copy the code

Thread pool state

As you can see from the above code, a 32-bit object is used to store the state of the thread pool and the capacity of the thread pool. The top three bits are the state of the thread pool, and the remaining 29 bits are the number of threads to store:

    // State quantity, stored in the high, the first three of 32 bits
  	// 111 (the first bit is a sign bit, and 1 represents a negative number), the thread pool is running
    private static final int RUNNING    = -1 << COUNT_BITS; 
  	/ / 000
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
  	/ / 001
    private static final int STOP       =  1 << COUNT_BITS;
  	/ / 010
    private static final int TIDYING    =  2 << COUNT_BITS;
  	/ / 011
    private static final int TERMINATED =  3 << COUNT_BITS;
Copy the code

The various states are different, and their states vary as follows:

  • RUNNING: Indicates that tasks can be accepted or processed
  • SHUTDOWN: cannot accept tasks, but can process them
  • STOP: The current task cannot be interrupted or accepted
  • TIDYING: All threads stop
  • TERMINATED: The last state of the thread pool

The Worker to realize

Thread pools, which must have pools and are places to put threads, are represented as Worker in ThreadPoolExecutor, which is an inner class:

A thread pool is essentially a collection of workers, using a HashSet:

private final HashSet<Worker> workers = new HashSet<Worker>();
Copy the code

How is Worker implemented?

Worker in addition to the inherited AbstractQueuedSynchronizer, namely AQS, AQS is essentially a queue lock, and a simple mutex, are generally used when interrupt or modify the state of the Worker.

AQS are introduced internally for thread safety. When a thread executes a task, it calls runWorker(Worker w), which is not a Worker method but a ThreadPoolExecutor method. As can be seen from the following code, every time the Worker’s state is modified, it is thread safe. Worker holds a Thread Thread, which can be understood as the encapsulation of the Thread.

How does runWorker(Worker W) work? Keep that question open for now, and we’ll talk more about it later.

    // Implement Runnable, encapsulating threads
    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        // serialize the ID
        private static final long serialVersionUID = 6138294804551838833L;

        // the thread in which the worker is running
        final Thread thread;
        
        // Initialize the task, which may be empty. If the task is not empty, other incoming tasks can be run directly without being added to the task queue
        Runnable firstTask;
        // Thread task counter
        volatile long completedTasks;

        // Assign a task to keep workers busy. This task may be empty
        Worker(Runnable firstTask) {
          	// Initialize the status of the AQS queue lock
            setState(-1); // Disable interrupts until runWorker
            this.firstTask = firstTask;
            // Take a thread from the thread factory and initialize it
            this.thread = getThreadFactory().newThread(this);
        }

        // It is runWorker that actually runs the call
        public void run(a) {
          	// Continuously loop to fetch tasks for execution
            runWorker(this);
        }

        // 0: no lock
        // 1 indicates the locked state
        protected boolean isHeldExclusively(a) {
            returngetState() ! =0;
        }
        // Exclusive, attempt to acquire the lock, return true on success, false on failure
        protected boolean tryAcquire(int unused) {
            // CAS optimistic lock
            if (compareAndSetState(0.1)) {
                // Success, current thread exclusive lock
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        // In exclusive mode, try to release the lock
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
        // call the AQS method
        public void lock(a)        { acquire(1); }
        // Try locking
        public boolean tryLock(a)  { return tryAcquire(1); }
        / / unlock
        public void unlock(a)      { release(1); }
        // Whether to lock
        public boolean isLocked(a) { return isHeldExclusively(); }

        // Stop if you start
        void interruptIfStarted(a) {
            Thread t;
            if (getState() >= 0&& (t = thread) ! =null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }
Copy the code

Task queue

In addition to a thread pool, if you have a lot of tasks and you don’t have that many threads, you definitely need a place to put the tasks, which acts as a buffer. This is called a task queue, which looks like this in your code:

private final BlockingQueue<Runnable> workQueue;
Copy the code

Reject policies and handlers

Computer memory is always limited, and we can’t keep adding to queues, so thread pools give us the option to choose from multiple queues. At the same time, when there are too many tasks, the thread is full, and the task queue is full, we need to make a certain response, that is, reject or throw errors, throw away the task? What tasks to get rid of, these are things that might need to be customized.

How do I create a thread pool

ThreadPoolExecutor provides a constructor for creating a thread pool. The main arguments are as follows:

  • Core threads: The number of core threads, usually resident threads, that are not destroyed when there are no tasks
  • Maximum number of threads: The maximum number of threads allowed to be created by the thread pool
  • Non-core thread lifetime: This refers to how long a non-core thread can survive without a task
  • A unit of survival time
  • Queue for storing tasks: Used to store tasks
  • Thread factory
  • Reject handler: If the task fails to be added, it will be handled by that handler
	// Specifies the number of core threads, the maximum number of threads, the duration of non-core threads without tasks, the unit of time, and the task queue
	public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
	  // Specifies the number of core threads, the maximum number of threads, the duration of non-core threads without tasks, the unit of time, the task queue, and the thread pool factory
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }
	  // Specify the number of core threads, maximum number of threads, non-core thread no task alive time, time unit, task queue, reject task handler
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }
		// This method is called
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {... }Copy the code

In addition to the specified parameters shown above, the JDK also wraps up some methods for directly creating a thread pool: Executors:

		// Thread pool with fixed number of threads, unbounded queue
		public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
		// A single thread pool, an unbounded queue, executes sequentially in the order in which tasks are submitted
		public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1.1.0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }
		// Dynamic tuning, no core threads, all normal threads, each thread lives 60 seconds, using a blocking queue of 1 capacity
    public static ExecutorService newCachedThreadPool(a) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
	  // Scheduled task thread pool
    public static ScheduledExecutorService newSingleThreadScheduledExecutor(a) {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
    }
Copy the code

However, it is generally not recommended to use the thread pool wrapped by others.

The underlying parameters of the thread pool and the core methods

You may be a little confused after reading the creation parameters above, but don’t worry.

As you can see, when a task comes in, the core thread pool is checked to see if it is full. If not, the thread will continue to be created. Notice that if a task comes in, the creation thread executes, the execution completes, and the thread is idle, will the next task continue to use the previous thread, or create a new thread to execute it?

The answer is to recreate the threads so that the thread pool can quickly reach the size of the core thread count to quickly respond to subsequent tasks.

If the number of threads has reached the core number of threads, tasks come in, and threads in the thread pool are not idle, then the queue is judged to be full, and if there is room in the queue, the task is put in the queue, waiting for the thread to pick up the execution.

If the task queue is full, it will determine whether the maximum number of threads has been reached. If not, it will continue to create threads and execute the task. In this case, it will create non-core threads.

If the maximum number of threads has been reached, no more threads can be created and only the rejection policy can be implemented. The default rejection policy is to discard the task. You can customize the rejection policy.

It is worth noting that if there are too many tasks before and some non-core threads are created, then after a certain period of time, the non-core threads will be destroyed, leaving only the number of threads in the core thread pool. This time is called keepAliveTime.

Submit a task

Execute () gets the state and number of threads in the thread pool. If the number of threads does not reach the number of core threads, threads will be added to the task queue. If the number of threads in the task queue does not reach the number of core threads, threads will be added to the task queue.

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        // Get the status and number
        int c = ctl.get();
      	// If the number is smaller than the number of core threads
        if (workerCountOf(c) < corePoolSize) {
          	// Add it directly
            if (addWorker(command, true))
                return;
          	// Continue to fetch if add fails
            c = ctl.get();
        }
      	// Check whether the thread pool state is running, and put the task into the queue
        if (isRunning(c) && workQueue.offer(command)) {
          	// Check again
            int recheck = ctl.get();
          	// Check whether the thread pool is still running
            if (! isRunning(recheck) && remove(command))
              	// If not, reject and remove the task
                reject(command);
            else if (workerCountOf(recheck) == 0)
              	// If the number of threads is 0 and still running, add it directly
                addWorker(null.false);
        }else if(! addWorker(command,false))
          	// Failed to add a task queue
            reject(command);
    }
Copy the code

AddWorker (Runnable firstTask, Boolean core); addWorker(Runnable firstTask, Boolean core); addWorker(Runnable firstTask, Boolean core);

    private boolean addWorker(Runnable firstTask, boolean core) {
      	// Go back to the current position and try again
        retry:
        for (;;) {
          	// Get the status
            int c = ctl.get();
            int rs = runStateOf(c);

            // If the value is greater than SHUTDOWN, the thread pool has stopped
          	/ /! (rs == SHUTDOWN && firstTask == null && ! Workqueue.isempty ()) indicates that at least one of the three conditions is not met
          	// If the value is not SHUTDOWN, the value is greater than SHUTDOWN
          	/ / firstTask! = null The task is not empty
          	// workqueue.isempty () isEmpty
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null&&! workQueue.isEmpty()))return false;

            for (;;) {
                // Number of worker threads
                int wc = workerCountOf(c);
              	// Whether it meets the capacity
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
              	// Add successfully, out of the loop
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
              	// cas failed, try again
                if(runStateOf(c) ! = rs)continue retry;
                // else CAS failed due to workerCount change; retry inner loop}}// The previous thread count was successfully added
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
          	// Create a worker and wrap the task
            w = new Worker(firstTask);
            final Thread t = w.thread;
          	// Thread created successfully
            if(t ! =null) {
              	/ / acquiring a lock
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Reconfirm the status
                    int rs = runStateOf(ctl.get());
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                      	// If the thread is already started, it fails
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                      	// Add a thread to the collection
                        workers.add(w);
                      	// Get the size
                        int s = workers.size();
                      	// Determine the maximum number of thread pools
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                      	// A worker thread has been added
                        workerAdded = true; }}finally {
                  	/ / unlock
                    mainLock.unlock();
                }
              	// If already added
                if (workerAdded) {
                  	// Start the thread
                    t.start();
                    workerStarted = true; }}}finally {
          	// If not started
            if (! workerStarted)
              	// Failed processing
                addWorkerFailed(w);
        }
        return workerStarted;
    }
Copy the code

Processing tasks

The run() method calls the runWorker() method, so let’s look at runWorkder() :

First, it will directly process its own firstTask, which is not in the task queue but is held by itself:

final void runWorker(Worker w) {
  			// The current thread
        Thread wt = Thread.currentThread();
  			// First task
        Runnable task = w.firstTask;
  			// Reset to null
        w.firstTask = null;
  			// Allow interruptions
        w.unlock();
        boolean completedAbruptly = true;
        try {
           // The task is not empty, or the acquired task is not empty
            while(task ! =null|| (task = getTask()) ! =null) {
              	/ / lock
                w.lock();
								// If the thread pool stops, make sure the thread is interrupted;
								// If not, make sure the thread is not interrupted. this
								// In the second case, a review is required
								// The shutdown - now race clears interrupts at the same time
                if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();try {
                  	// Execute the previous callback method (which we can implement ourselves)
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                      	// Execute the task
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                      	// Execute the post-callback methodafterExecute(task, thrown); }}finally {
                  	/ / set to null
                    task = null;
                  	// Update completes the taskw.completedTasks++; w.unlock(); }}/ / finish
            completedAbruptly = false;
        } finally {
          	// Handle thread exitprocessWorkerExit(w, completedAbruptly); }}Copy the code

Above you can see if the current task is null, will go to get a task, we see getTask (), which involves two parameters, one is whether to allow the core thread, another is the core thread count is greater than the number of threads, if meet the conditions, the task is removed from the queue, if take less than a timeout, then returns an empty, Indicates that the task was not picked up, the previous loop will not be executed and the thread will destroy processWorkerExit() and so on.

private Runnable getTask(a) {
  	// Whether timeout occurs
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // The SHUTDOWN state continues to process tasks in the queue, but no new tasks are received
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
      	/ / the number of threads
        int wc = workerCountOf(c);

        // Whether the core thread is allowed to timeout or the number of threads is greater than the number of core threads
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
          	// Thread reduction success returns NULL, which is processed by processWorkerExit()
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
          	// If the core thread is allowed to close, or the core thread is exceeded, the task can be fetched within the timeout period, or it can be fetched directly
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
          	// If you can get the task, you can definitely execute it
            if(r ! =null)
                return r;
          	// If you do not get the task, it will time out
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false; }}}Copy the code

Destruction of the thread

As mentioned earlier, if the thread has no current task and allows the core thread to destroy, or if the thread has exceeded the number of core threads, waited a certain amount of time, timed out but did not get the task from the task queue, it will jump out of the loop and execute to the following thread destruction (termination) program. So what do you do when you destroy a thread?

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
      	// If the thread is terminated suddenly, the previous number of threads is not adjusted, so it needs to be adjusted
        if (completedAbruptly)
            decrementWorkerCount();
      	/ / acquiring a lock
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
      
        try {
          	// Number of completed tasks
            completedTaskCount += w.completedTasks;
            // Remove the thread
          	workers.remove(w);
        } finally {
          	/ / unlock
            mainLock.unlock();
        }
      	// Try to stop
        tryTerminate();
      	// Get the status
        int c = ctl.get();
      	// Is smaller than stop, at least shutdown
        if (runStateLessThan(c, STOP)) {
          	// If it is not done suddenly
            if(! completedAbruptly) {// The minimum value is either 0 or the number of core threads, or 0 if the core thread is allowed to timeout
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
              	// If the minimum is 0 or the queue is not empty, keep one thread
                if (min == 0&&! workQueue.isEmpty()) min =1;
              	If the number of threads is greater than or equal to the minimum, the current thread is terminated
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
          	// Otherwise, additional worker threads may be required
            addWorker(null.false); }}Copy the code

How do I stop a thread pool

The thread pool can be stopped using shutdown() or shutdownNow(), which continues processing tasks in the queue, while shutdownNow() immediately cleans up the task and returns unexecuted tasks.

    public void shutdown(a) {
        / / acquiring a lock
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
          	// Check the stop permission
            checkShutdownAccess();
          	// Update the status
            advanceRunState(SHUTDOWN);
          	// Interrupt all threads
            interruptIdleWorkers();
          	// Call back the hook
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }
		// Stop immediately
   public List<Runnable> shutdownNow(a) {
        List<Runnable> tasks;
     		/ / acquiring a lock
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
          	// Check the stop permission
            checkShutdownAccess();
          	// Update the status to stop
            advanceRunState(STOP);
          	// Interrupt all threads
            interruptWorkers();
            // Clear the queue
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
     		// Return to task list (incomplete)
        return tasks;
    }
Copy the code

Execute () and submit() methods

  • execute() Method can submit tasks that do not require a return value and cannot determine whether the task was successfully executed by the thread pool
  • submit()Method is used to submit tasks that require a return value. The thread pool returns an object of type Future through which we callget()The method will doblocking, until we get the result of the thread’s completion, and we can also use the wait method with a timeoutGet (long timeout, TimeUnit unit)If the thread is running out of time, it will not block and return NULL. Returns theRunnableFutureObject, inheritedRunnable, Future<V>Two interfaces:
public interface RunnableFuture<V> extends Runnable.Future<V> {
    /** * Sets this Future to the result of its computation * unless it has been cancelled. */
    void run(a);
}
Copy the code

Why do thread pools use blocking queues?

Blocking queues, first and foremost a queue, must have a first-in, first-out property.

And blocking, which is an evolution of this model, the general queue, can be used in the production consumer model, which is data sharing, where somebody puts tasks in, somebody keeps taking tasks out, and that’s an ideal state.

However, if this is not ideal, the speed of producing and consuming tasks is different. If the task is in the queue and consuming slowly, it can be consumed slowly, or the producer has to pause the producing task (blocking the producer thread). You can use offer(E o, long timeout, TimeUnit unit) to set the waiting time. If you cannot add BlockingQueue to the queue within the specified period of time, the queue will fail. You can also use put(Object) to place the Object in the BlockingQueue. If there is no space, the method blocks until there is space.

If the consumption speed is fast and the producer cannot produce the task, poll(time) can be used to obtain the task. If there is data, poll(time) can be directly retrieved. If there is no data, poll(time) can wait for time and return NULL. You can also use take() to fetch the first task and block until there are no more tasks in the queue.

The blocking queue property is described above, so why use it?

  • If a task is created, it is put into a queue when it comes, and resources are easily exhausted.
  • Creating a thread requires acquiring a lock, which is a global lock of a thread pool. If each thread is constantly acquiring a lock, unlocking, thread context switching and so on, the overhead is also high. It is better to block a thread waiting when the queue is empty.

Common blocking queues

  • ArrayBlockingQueue: Array-based implementation with an array of fixed length that holds the position of the head and tail of the queue.
  • LinkedBlockingQueue: LinkedBlockingQueue is a linked list based blocking match. producers and consumers use separate locks, parallelism is strong, if not specified capacity, default is invalid capacity, easy to run out of system memory.
  • DelayQueue: DelayQueue. There is no size limit. Production data will not be blocked, but consumption data will.
  • PriorityBlockingQueue: A priority-based blocking queue, consuming according to priority, with internal control synchronizing fair locks.
  • SynchronousQueue: There is no buffer, because the producer assigns tasks directly to the consumer, with no buffer in between.

How do thread pools reuse threads? What happens to the thread that completes execution

The thread in the thread pool calls runWorker(), which is an infinite loop, unless it can’t get the task. If it doesn’t have firstTask and can’t get the task from the task queue, when it times out, The current thread is terminated only when the number of core threads is exceeded.

Otherwise, it will always be in a cycle that will not end.

We know that the start() method can only be called once, so when the run() method is called, the external runWorker() is called, so that the runWorker() method is constantly looping to get the task during runWorker(). Get the task, call the run() method of the task.

The thread will call processWorkerExit (), which will acquire the lock, reduce the number of threads, and remove them from the set. After removing them, it will determine if there are too few threads, and if so, it will add them back.

How do I configure thread pool parameters?

In general, there is a formula that sets the number of core threads to processor cores -1 for computationally intensive tasks and 2 cores for IO intensive tasks (many network requests). However, this is not a silver bullet, everything should be practical, the best is to test in the test environment, the knowledge of practice, and many times a machine has more than one thread pool or other threads, so do not set too full parameters.

On a typical 8-core machine, 10-12 core threads are all that matters, depending on the business value. Setting up too many threads, context switching, competitive, too few Settings, and no way to make full use of the computer’s resources.

Computing (CPU) intensive consumption is mainly CPU resources, the number of threads can be set to N (NUMBER of CPU cores) +1, one more thread than the number of CPU cores is to prevent the impact of the thread occasional page miss interrupt, or other reasons caused by the suspension of the task. Once the task is paused, the CPU is idle, and in this case the extra thread can take full advantage of the idle CPU time.

IO intensive systems spend most of their time processing I/O interactions, and threads do not use CPU for I/O processing time, which can be handed over to other threads. Therefore, in the application of I/O intensive tasks, we can configure more threads, the specific calculation method is 2N.

Why not recommend the default thread pool creation method?

In ali’s programming specification, it is not recommended to use the default method to create threads. This is because many threads are created with default parameters, which can cause problems if the creator is not familiar with them. It is best to use new ThreadPoolExecutor() to control the parameters. The default creation mode has the following problems:

  • Limitless Executors. NewFixedThreadPool () : queue, memory may be
  • Executors. NewSingleThreadExecutor () : a single thread, low efficiency, the serial port.
  • Executors. NewCachedThreadPool () : there is no core thread, the largest number of threads can be infinite, memory might have maxed out.

To create a thread pool with specific parameters, the developer must understand what each parameter does and not mess with the parameters to reduce memory overflow issues.

Generally reflected in the following problems:

  • How to set the task queue?
  • How many core threads?
  • What is the maximum number of threads?
  • How do I turn down an assignment?
  • The thread was created without a name and traceability problems are hard to find.

Thread pool rejection policy

Thread pools generally have the following four rejection strategies, which can be seen from their inner classes:

  • AbortPolicy: Does not execute a new task and raises an exception indicating that the thread pool is full
  • DisCardPolicy: Does not execute new tasks, but does not throw exceptions either, silently
  • DisCardOldSetPolicy: Discards the oldest task in the message queue and changes it to a new task
  • CallerRunsPolicy: Directly invoke the current execute to execute the task

In general, the above rejection policies are not ideal, if general task is full, the first thing you need to do is look at task is necessary, if not necessary, non-core, can consider to rejected, and an error warning, if it is a must, must keep it up, whether the use of mq message, or other means, not tasks. In these processes, logging is essential. Both protect the thread pool and be responsible for the business.

Thread pool monitoring and dynamic tuning

Thread pools provide apis for dynamically retrieving thread pool state and setting thread pool parameters, as well as state:

Check the status of the thread pool:

Modify the thread pool state:

The thread pool article on Meituan makes this very clear, and even provides a platform for adjusting thread pool parameters in real time. You can track thread pool activity, task Transaction, Reject exceptions, internal thread pool statistics, and more. Here I wouldn’t have launched, the original: tech.meituan.com/2020/04/02/… This is the idea we can refer to.

Thread pool isolation

Thread isolation, as many of you may know, means that different tasks are run in different threads, while thread pool isolation is generally isolated according to business types. For example, the order processing thread is placed in a thread pool, and membership-related processing thread is placed in a thread pool.

It can also be isolated by core and non-core, the core process is put together, the non-core process is put together, the two use different parameters, different rejection strategy, as far as possible to ensure that there is no influence between multiple thread pools, and as far as possible to keep the core thread running, non-core thread can tolerate failure.

Hystrix uses this technology, Hystrix’s thread isolation technology, to prevent avalanches between different network requests, even if the thread pool of a dependent service is full, without affecting the rest of the application.

About the author

Qin Huai, author of public number [Qin Huai Grocery store], the road of technology is not at that time, the mountain is high and the water is long, even if slow, and not stop. Personal Writing Direction: Java source code analysis, JDBC, Mybatis, Spring, Redis, distributed, sword Offer, LeetCode, etc., carefully write each article, do not like the title party, do not like the flowery, mostly write a series of articles, can not guarantee that I write are completely correct, But I guarantee that what I write is done through practice or research. We hope to correct any omissions or mistakes.

What did I write about 2020?

Open Source Programming Notes