background

It’s easy to create a thread when you’re using threads, but there’s a problem:

If there are a large number of concurrent threads, and each thread executes a short task and then terminates, creating threads frequently can greatly reduce the efficiency of the system because of the time it takes to create and destroy threads frequently.

So is there a way that threads can be reused, that they can finish a task and not be destroyed, but can continue to perform other tasks?

1. Why use thread pools

You’ve all heard of pooling. Thread pooling, database connection pooling, HTTP connection pooling, and so on are all applications of this idea.

The idea of pooling technology is to reduce the consumption of resources per acquisition and improve the utilization of resources.

Thread pools provide a way to limit and manage resources, and each thread pool also maintains some basic statistics, such as the number of completed tasks.

In the Art of Concurrent Programming in Java, the benefits of using thread pools are mentioned:

  • Reduced resource consumption: Reduces thread creation and destruction costs by reusing created threads.
  • Improved response time: When a task arrives, it can be executed immediately without waiting for a thread to be created.
  • Improve manageability of threads: Threads are scarce resources. If they are created without limit, they will consume system resources and reduce system stability. Thread pools can be used for unified management allocation, tuning, and monitoring.

2. Implement the difference between Runnable interface and Callable interface

The Runnable interface has existed since Java 1.0, but Callable was only introduced in Java 1.5 to handle use cases not supported by Runnable. The Runnable interface does not return results or throw check exceptions, but the Callable interface does. It is recommended to use the Runnable interface if the task does not need to return results or throw exceptions so that the code looks more concise.

Note: The Executors tool can directly convert Runnable and Callable objects to each other

@FunctionalInterface
public interface Runnable {
    /** * This method returns no value and cannot throw an exception */
    public abstract void run(a);
}
Copy the code
@FunctionalInterface
public interface Callable<V> {
    /** * computes the result, or throws an exception when it is impossible to do so@returnResults obtained by computed result *@throwsException if unable to compute a result throws an Exception */
    V call(a) throws Exception;
}
Copy the code

3. Perform the execute() method and submit() method

  1. The execute() method is used to submit tasks that do not require a return value, so there is no way to determine whether the task was successfully executed by the thread pool.
  2. The submit() method is used to submit a task that needs to return a value, and the thread pool returns an object of type Future that can be used to determine the success of the task.

View the Submit method in the AbstractExecutorService interface:

	/ * * *@throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc} * /
    publicFuture<? > submit(Runnable task) {if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
Copy the code

The above method calls newTaskFor and returns a FutureTask object.

    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }
Copy the code

Let’s look at the execute() method:

    public void execute(Runnable task) {}Copy the code

4. How do I create a thread pool

Alibaba Java Development Manual forces the thread pool to be created by ThreadPoolExecutor instead of Executors. By doing so, students can clarify the running rules of the thread pool and avoid the risk of resource depletion.

Method 1: through the constructor

Method 2: Use the Executors tool to implement the configuration. Essentially, invoke the Constructor of ThreadPoolExecutor.

We can create three types of ThreadPoolExecutor:

  • FixedThreadPool: This method returns a thread pool with a fixed number of threads. When a new task is submitted, it is executed immediately if there are idle threads in the thread pool. If no, the new task is temporarily stored in a task queue. When a thread is idle, the task in the task queue will be processed.
  • SingleThreadExecutor: This method returns a thread pool with only one thread. When a new task is submitted, the new task is temporarily stored in a task queue. When the thread is idle, the tasks in the queue are executed in first-in, first-out order.
  • CachedThreadPool: This method returns a thread pool that can adjust the number of threads as needed. If there are idle threads that can be reused, the reusable threads are preferred. If all threads are working and a new task is submitted, a new thread is created to process the task. All threads will return to the thread pool for reuse after completing the current task.

5. ThreadPoolExecutor analysis

The ThreadPoolExecutor class provides four constructors, and we’ll examine the constructor with the most arguments.

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
Copy the code

The three most important parameters of ThreadPoolExecutor are:

  • CorePoolSize: Number of core threads, which defines the minimum number of threads that can run at the same time.
  • MaximumPoolSize: When the number of tasks in the queue reaches the capacity of the queue, the number of threads that can run simultaneously becomes the maximum number of threads.
  • WorkQueue: When a new task arrives, the system determines whether the number of running threads reaches the number of core threads. If so, the new task is placed in the queue.

ThreadPoolExecutor other common parameters:

  • KeepAliveTime: The idle time of a thread. When the idle time of a thread reaches keepAliveTime, the thread exits until the number of threads =corePoolSize or, if allowCoreThreadTimeout=true, until the number of threads =0
  • Unit: keepAliveTime Time unit
  • ThreadFactory: Used when executor creates a new thread.
  • Handler: refuses to execute the policy. When the cache queue of the thread pool is full and the number of threads in the thread pool reaches maximumPoolSize, a task rejection policy is adopted if additional tasks arrive. Typically, there are four strategies:
/ / discard task and throw RejectedExecutionException anomalies.
ThreadPoolExecutor.AbortPolicy
// Discard the task, but do not throw an exception.
ThreadPoolExecutor.DiscardPolicy
// Discard the task at the top of the queue and try again (repeat the process)
ThreadPoolExecutor.DiscardOldestPolicy
// This task is handled by the calling thread
ThreadPoolExecutor.CallerRunsPolicy
Copy the code

6. Implement a thread pool

/** * This is a Runnable class that specifies 5 seconds to execute the task *@author linchaokun
 */
public class MyRunnable implements Runnable{

    private String msg;

    public MyRunnable(String msg) {
        this.msg = msg;
    }

    @Override
    public void run(a) {
        System.out.println(Thread.currentThread().getName() + "StartTime:" + LocalDateTime.now());
        processMsg();
        System.out.println(Thread.currentThread().getName() + "EndTime:" + LocalDateTime.now());
    }

    private void processMsg(a) {
        try {
            Thread.sleep(5000);
        } catch(InterruptedException e) { e.printStackTrace(); }}@Override
    public String toString(a) {
        return "MyRunnable{" +
                "msg='" + msg + '\' ' +
                '} '; }}Copy the code
/** * Create thread pool *@author linchaokun
 */
public class ThreadPoolExecutorDemo {

    private static final int CORE_POOL_SIZE = 5;
    private static final int MAX_POOL_SIZE = 10;
    private static final long KEEP_ALIVE_TIME = 1L;
    private static final int QUEUE_CAPACITY = 100;


    public static void main(String[] args) {

        // Create a thread pool using the constructor
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                CORE_POOL_SIZE,
                MAX_POOL_SIZE,
                KEEP_ALIVE_TIME,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(QUEUE_CAPACITY),
                new ThreadPoolExecutor.CallerRunsPolicy());

        for (int i = 0; i < 10; i++) {
            // Create the WorkerThread object
            Runnable worker = new MyRunnable(i + "");
            / / execution Runnable
            executor.execute(worker);
        }

        // Terminates the thread
        executor.shutdown();

        while(! executor.isTerminated()){ } System.out.println("Stop all threads..."); }}Copy the code

As you can see, our code above specifies:

  1. CorePoolSize: The number of core threads is 5
  2. MaximumPoolSize: the maximum number of threads is 10
  3. KeepAliveTime: the wait time is 1L
  4. Unit: the unit is timeunit. SECONDS
  5. WorkQueue: The task queue is ArrayBlockingQueue and has a capacity of 100
  6. Handler: saturates CallerRunsPolicy

Result of executing code:

pool-1-thread-3StartTime:2020-07-14T11:09:04.301
pool-1-thread-4StartTime:2020-07-14T11:09:04.301
pool-1-thread-2StartTime:2020-07-14T11:09:04.301
pool-1-thread-1StartTime:2020-07-14T11:09:04.301
pool-1-thread-5StartTime:2020-07-14T11:09:04.301
pool-1-thread-4EndTime:2020-07-14T11:09:09.302
pool-1-thread-1EndTime:2020-07-14T11:09:09.302
pool-1-thread-3EndTime:2020-07-14T11:09:09.302
pool-1-thread-1StartTime:2020-07-14T11:09:09.302
pool-1-thread-3StartTime:2020-07-14T11:09:09.302
pool-1-thread-2EndTime:2020-07-14T11:09:09.302
pool-1-thread-5EndTime:2020-07-14T11:09:09.302
pool-1-thread-5StartTime:2020-07-14T11:09:09.302
pool-1-thread-2StartTime:2020-07-14T11:09:09.302
pool-1-thread-4StartTime:2020-07-14T11:09:09.302
pool-1-thread-1EndTime:2020-07-14T11:09:14.302
pool-1-thread-5EndTime:2020-07-14T11:09:14.302
pool-1-thread-3EndTime:2020-07-14T11:09:14.302
pool-1-thread-4EndTime:2020-07-14T11:09:14.302
pool-1-thread-2EndTime:2020-07-14T11:09:14.302Stop all threads...Copy the code

7. Analysis of thread pool principle

From the above execution results, we can see that the thread pool will execute 5 tasks at the same time. After these 5 tasks are executed, the remaining 5 tasks will be executed.

To understand how thread pools work, we need to examine the execute method. Let’s take a look at its source code:

// Store the running state of the thread pool and the number of valid threads in the thread pool (workerCount)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

private static int workerCountOf(int c)  { return c & CAPACITY; }

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        // CTL contains some information about the current state of the thread pool
        int c = ctl.get();
    	// 1. Check whether the number of tasks executed in the current thread pool is smaller than corePoolSize
    	// If smaller, create a new thread with the addWorker method, add the task command to the thread, and start the thread to execute the task.
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
    	// 2. If the number of tasks currently executed is greater than or equal to corePoolSize, it will go here
    	If the thread pool is in the RUNNING state, the task will be added
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // Get the thread pool state again. If the thread pool state is not RUNNING, remove the task from the task queue and try to determine whether all threads have finished executing and execute the reject policy.
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // If the current thread pool is empty, a new thread is created and executed
            else if (workerCountOf(recheck) == 0)
                addWorker(null.false);
        }
    	// 3. Use addWorker to create a new thread, add task command to the thread, and start the thread to execute the task.
    	// If addWorker fails to execute, reject executes the corresponding reject policy.
        else if(! addWorker(command,false))
            reject(command);
    }

Copy the code

Use a diagram to summarize the three steps above:

Original link: www.linchaokun.cn/?p=21