Most concurrent applications are structured around “Task Execution” : tasks are typically abstract, discrete units of work. By breaking up the work of your application into multiple tasks, you can simplify the organization of your application

Executor task execution framework

By designing applications around task execution, you can simplify the development process and help achieve concurrency. The Executor framework decouples task submission from execution policies and supports many different types of execution policies. Consider using Executor when you need to create threads to perform tasks. To get the maximum benefit when splitting an application into different tasks, clear task boundaries must be defined. In some applications, there are clear task boundaries, while in others further analysis is required to reveal finer grained parallelism.

introduce

A task is a set of logical units of work, and threads are the mechanism by which tasks are executed asynchronously. In the Java class library, the primary abstraction for task execution is not Thread, but Executor.

Executor is an interface that forms the basis of the Executor framework, separating the submission of tasks from their execution. The framework can support many different types of task execution strategies. It provides a standard way to decouple the submission process from the execution process of a task and represent the task with a Runnable. Executor implementations also provide lifecycle support, as well as mechanisms such as statistics collection, application management mechanisms, and performance monitoring.

Executor framework architecture

The Executor framework consists of three main parts:

  • Task. This interface includes the Runnable interface or the Callable interface that the task needs to implement.
  • Task execution. It includes Executor, the core task execution interface, and ExecutorService interface inherited from Executor
  • The result of asynchronous computation. Includes the interface Future and the FutureTask class that implements the Future interface.

The Executor framework enforces policies

By decoupling the submission and execution of tasks, execution strategies can be specified and modified for certain types of tasks without too much difficulty.

Each execution policy is a resource management tool, and the best policy depends on the available computing resources and the demand for quality of service. By limiting the number of concurrent tasks, you ensure that your application does not fail due to resource exhaustion or severely impact performance due to competition over scarce resources. By separating the submission of a task from the execution strategy of the task, it helps to select the execution strategy that best matches the available hardware resources during deployment. Whenever you see code like this:

 new Thread(runnable).start()
Copy the code

And if you want a more flexible execution strategy, consider using Executor instead of Thread.

Executor life cycle

We’ve seen how to create an Executor, but we haven’t discussed how to turn it off. Executor implementations typically create threads to perform tasks. But the JVM exits only after all (non-daemon) threads have been terminated. Therefore, if Executor is not shut down properly, the JVM will not end.

Because executors execute tasks asynchronously, the status of previously submitted tasks is not immediately visible at any time. Some tasks may be completed, some may be running, and others may be in a queue waiting to execute. When shutting down an application, it can take the gentlest form of shutting down, or the roughest form of shutting down. To address the lifecycle of execution services, Executor has extended the ExecutorService interface, adding methods for lifecycle management (as well as convenience methods for task submission).

The ExecutorService life cycle has three states: running, closed, and terminated.

ExecutorService is in a running state when it is initially created.

  • The shutdown() method performs a gentle shutdown: no new tasks are accepted, while waiting for the execution of already committed tasks to complete — including those that have not yet started.
  • The shutdownNow() method performs a brutal shutdown: it tries to cancel all running tasks and no longer starts tasks in the queue that have not yet started execution.

Once all tasks are complete, the ExecutorService transitions to the terminated state.

  • The awaitTermination() method waits for the ExecutorService to reach the termination state.
  • IsTerminated () method to poll whether the ExecutorService has terminated.

Shutdown () is usually called immediately after awaitTermination(), which has the effect of shutting down the ExecutorService synchronously.

Close the ExecutorService

ExecutorService provides two ways to shutdown: shutdown normally, and shutdownNow forcibly. In a forced shutdown, shutdownNow first closes the currently executing task and then returns a list of all tasks that have not yet been started. The difference between the two shutdowns lies in their safety and responsiveness: forcible shutdowns are faster, but riskier, because the task is likely to end halfway through; Normal shutdowns are slower, but safer, because the ExecutorService waits until all tasks in the queue have executed. A similar shutdown option should be considered in other services that have threads.

Callable and Runnable interfaces

The Executor framework uses Runnable as its basic task representation. Runnable is a very limited one. Although run can write to a log file or put results into a shared data structure, it cannot return a value or throw a checked exception.

Many tasks are actually computation with latency — performing database queries, fetching resources from the network, or computing some complex function. A Callable is a better abstraction for these tasks: it can return a value (to use a Callable for a task that does not return a value, use a Callable) and possibly throw an exception.

The Executors tool provides a series of methods for converting Runnable to Callable. The following two methods are listed:

  • Executors. Callable (runnable, result) Returns a Callable object that, when called, runs the given task and returns the given result.
  • Executors. Callable (Runnable) Returns a Callable object that, when called, runs the given task and returns NULL.

Runnable and Callable describe abstract computing tasks. These tasks are usually scoped, meaning they have a clear starting point and end eventually. Executor tasks have four life cycle phases: create, commit, start, and complete. Because some tasks may take a long time to execute, it is often desirable to be able to cancel them. In the Executor framework, tasks that have been committed but not yet started can be cancelled, but those that have already started can only be cancelled if they can respond to an interrupt. Canceling a completed task has no impact.

The run() method returns a void value, so no results can be returned after the task has been executed

The call() function returns the type passed in as V

Future

A Future represents the result of an asynchronous task that may not have completed, to which a Callback can be added to act upon the success or failure of the task. A Future represents the life cycle of a task and provides methods to determine whether it has been completed or cancelled, to get the result of the task, to cancel the task, and so on. Implicit in the Future specification is that the life cycle of tasks can only move forward, not backward, as with the life cycle of the ExecutorService. When a task is complete, it stays in the “done” state forever. The behavior of the GET method depends on the state of the task (not started, running, completed). Get immediately returns or throws an Exception if the task is complete, and blocks until the task is complete if it is not. If the task throws an exception, get wraps it as ExecutionException and rethrows it. If the task is cancelled, then GET will throw a CancellationException. If GET throws an ExecutionException, then getCause can be used to get the encapsulated initial exception.

There are many ways to create a Future to describe a task. All submit methods in the ExecutorService return a Future, which submits a Runnable or Callable to the Executor and gets a Future that can be used to obtain the execution result of the task or cancel the task.

public static void main(String[] args) { ExecutorService executor = Executors.newCachedThreadPool(); Future<Integer> future = executor.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { for (int i = 0; i <5 ; i++) { System.out.println(i); } return 10; }}); try{ Integer result = future.get(); System.out.println(result); }catch (Exception ex){ ex.printStackTrace(); } executor.shutdown(); }Copy the code

Cancel through the Future

The Future has a cancel method with a Boolean parameter mayInterruptlfRunning indicating whether the cancellation was successful. (This only indicates whether the task can receive interrupts, not whether the task can detect and handle interrupts.) If mayInterruptIfRunning is true and the task is currently running in a thread, the thread can be interrupted. If this parameter is false, it means “do not run the task if it has not been started”, which should be used for tasks that do not handle interrupts. Do not interrupt a thread unless you know its interrupt policy, so when can calling Cancel set the argument to true? The thread that executes the task is created by the standard Executor, which implements an interrupt policy so that the task can be cancelled by interruption, so if the task is running in the standard Executor and cancelled by their Future, mayInterruptlfRunning can be set. When trying to cancel a task, it is not advisable to interrupt the thread pool directly, because you do not know what task is running when the interrupt request arrives – cancellation can only be done through the task’s Future. This is another reason to write tasks with interrupts as a cancellation request: they can be cancelled through the Future of the task.

FutureTask

FutureTask implements the Runnable interface in addition to the Future interface. Thus, FutureTask can be handed to executors or executed directly by the calling thread (futureTask.run ()). FutureTask is used when a thread needs to wait for another thread to finish a task before it can continue.

The computation represented by FutureTask is implemented through Callable, which acts as a Runnable that generates results, and can be in one of the following three states:

  • Not started: FutureTask is not started before the futureTask.run () method is executed. When a FutureTask is created and the futureTask.run () method is not executed, the FutureTask is not started.
  • Started: FutureTask is started while the futureTask.run () method is being executed.
  • Completed: Completed indicates all possible completion of the calculation, including normal completion, cancellation completion, and exception completion. When FutureTask enters the finished state, it stops in that state forever. FutureTask passes the result of a calculation from the thread that performs the calculation to the thread that retrieves the result, and the FutureTask specification ensures that this delivery process enables the safe publication of the result.

CompletionService

If you submit a set of computations to an Executor and expect results when the computations are complete, you can poll to determine whether the tasks are complete by keeping the Future associated with each task and then repeatedly using the GET method with the timeout parameter specified to 0. This approach is feasible, but cumbersome. Fortunately, there’s a better way: Complete Service.

CompletionService blends the functions of Executor and BlockingQueue. You can submit Callable tasks to it for execution, and then use methods like take and poll like queue operations to get the finished results, And these results when finished will be encapsulated in the Future. The ExecutorCompletionService CompletionService is achieved, and calculates partly delegated to an Executor.

ExecutorCompletionService implementation is very simple. Create a BlockingQueue in the constructor to hold the result of the calculation. When the calculation is complete, the done method in Future-Task is called. When a task is submitted, it is first wrapped as a QueueingFuture, a subclass of FutureTask, then overwrites the done method of that subclass and places the result in BlockingQueue, The take and poll methods are delegated to BlockingQueue, which blocks before the result is produced.

The thread pool

A thread pool, literally, is a pool of resources that manages a homogeneous set of worker threads. Thread pools are closely related to Work queues, which hold all tasks waiting to be executed. The task of Worker threads is simple: grab a task from the work queue, execute the task, then return to the Thread pool and wait for the next task.

When creating a large number of threads:

The overhead of the thread lifecycle is very high. Threads are not created and destroyed for free. The actual overhead varies from platform to platform, but thread creation takes time, delays processing requests, and requires some assistance from the JVM and operating system.

Resource consumption. Active threads consume system resources, especially memory. If the number of runnable threads exceeds the number of available processors, some threads will be idle. A large number of idle threads can take up a lot of memory, strain the garbage collector, and have other performance costs when a large number of threads compete for CPU resources. If you already have enough threads to keep all the cpus busy, creating more threads will actually slow down performance.

Stability. There is a limit to the number of threads that can be created. This limit will vary from platform to platform and is subject to several factors, including the JVM’s startup parameters, the stack size of requests in the Thread constructor, and Thread constraints imposed by the underlying operating system. If you break these limits, you will most likely throw an OutOfMemoryError, a simple way to limit the number of threads your application can create.

Thread pool advantage

  • By reusing existing threads rather than creating new ones, you can spread out the huge overhead incurred during thread creation and destruction when processing multiple requests.
  • When the request arrives, the worker thread usually already exists, so the task is not delayed by waiting for the thread to be created, improving responsiveness.
  • By sizing the thread pool appropriately, you can create enough threads to keep the processor busy and prevent your application from running out of memory or failing as multiple threads compete for resources.

Thread pool processing flow

  1. Thread pool Determines whether all threads in the core thread pool are executing tasks. If not, a new worker thread is created to execute the task. If all the threads in the core thread pool are executing tasks, the next process goes on.
  2. The thread pool determines whether the work queue is full. If the work queue is not full, the newly submitted task is stored in the work queue. If the work queue is full, it goes to the next process.
  3. Thread pool Determines whether all threads in the thread pool are working. If not, a new worker thread is created to execute the task. If it is full, the saturation policy is assigned to handle the task.



The main processing flow of thread pools

How thread pools work

ThreadPoolExecutor performs the execute method in the following four cases:

  1. If fewer threads are currently running than corePoolSize, a new thread is created to perform the task (note that this step requires a global lock).
  2. If the running thread is equal to or more than corePoolSize, the task is added to BlockingQueue.
  3. If the task cannot be added to the BlockingQueue (the queue is full), a new thread is created to process the task (note that this step requires a global lock).
  4. If you create a new thread will make the currently running thread beyond maximumPoolSize, task will be rejected, and call the RejectedExecutionHandler. RejectedExecution () method.

The overall design idea for ThreadPoolExecutor to take these steps is to avoid acquiring global locks as much as possible when executing the execute() method (which would be a serious scalability bottleneck). After ThreadPoolExecutor has warmed up (the number of threads currently running is greater than or equal to corePoolSize), almost all execute() method calls execute Step 2, which does not require a global lock.



ThreadPoolExecutor execution diagram

Public void execute(Runnable command) {if (command == null) int c = ctl.get(); If (workerCountOf(c) < corePoolSize) {if (addWorker(command, true)) return; c = ctl.get(); } // If the number of threads is greater than or equal to the basic number of threads, the current task is put to the work queue. if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } // If the thread pool is not running or the task cannot be queued, and the current number of threads is less than the maximum allowed number of threads, // create a thread to execute the task. else if (! AddWorker (command, false)) / / throw RejectedExecutionException abnormal reject (command); }Copy the code

Creation of a thread pool

There are two ways to create a thread pool:

The first is to create a thread pool by calling one of the Static Factory methods in Executors

  • NewFixedThreadPool: A fixed-length thread pool is created, creating one thread each time a task is submitted, until the maximum number of threads in the pool is reached, at which point the pool size does not change (if a thread ends due to an unexpected Exception, a new thread is added to the pool).
  • NewCachedThreadPool: A cacheable thread pool will be created. If the current size of the thread pool exceeds the processing requirement, idle threads will be reclaimed, and new threads can be added as the demand increases. There is no limit to the size of the thread pool.
  • NewSingleThreadExecutor: is a single-threaded Executor that creates a single worker thread to execute a task, and if this thread terminates abnormally, another thread is created to replace it. NewSingleThreadExecutor ensures that tasks are executed sequentially according to the order in the queue (FIFO, LIFO, priority).
  • NewScheduledThreadPool: creates a thread pool of fixed length and executes tasks on a deferred or timed basis



If the thread pool is created by the Executors class, follow the following steps: * If the thread pool is created by the Executors class, follow the following steps: * If the thread pool is created by the Executors class, follow the following steps: * If the thread pool is created by the Executors class, follow the following steps:

The second is created from the ThreadPoolExecutor class

new  ThreadPoolExecutor(int corePoolSize,
        int maximumPoolSize,
        long keepAliveTime,
        TimeUnit unit,
        BlockingQueue<Runnable> workQueue,
        ThreadFactory threadFactory,
        RejectedExecutionHandler handler) 
Copy the code

CorePoolSize (base size of the thread pool) : When a task is submitted to the thread pool, the thread pool creates a thread to execute the task, even if other free base threads are able to execute new tasks, until the number of tasks that need to be executed exceeds the base size of the thread pool. If the thread pool’s prestartAllCoreThreads() method is called, the pool creates and starts all base threads ahead of time.

MaximumPoolSize: The maximum number of threads allowed to be created in a thread pool. If the queue is full and the number of created threads is less than the maximum, the thread pool creates a new thread to execute the task. It is worth noting that this parameter has no effect if an unbounded task queue is used.

KeepAliveTime: The amount of time that a worker thread in a thread pool stays alive after it is idle. Therefore, if there are many tasks and the execution time of each task is short, you can adjust the time to improve thread utilization.

TimeUnit: Optional units are DAYS, HOURS, and so on.

WorkQueue: A blocking queue that holds tasks waiting to be executed. You can choose from the following blocking queues.

  • ArrayBlockingQueue: ArrayBlockingQueue is a bounded blocking queue based on an array structure that sorts elements in A FIFO (first-in, first-out) order.
  • LinkedBlockingQueue: A LinkedBlockingQueue based on a linked list structure that sorts elements in FIFO and has a higher throughput than ArrayBlockingQueue. Static factory methods Executors. NewFixedThreadPool () using the queue.
  • SynchronousQueue: A blocking queue that does not store elements. Each insert operation must wait until another thread calls to remove operation, otherwise the insert has been in the blocking state, the throughput is usually more than the Linked – BlockingQueue, static factory methods Executors. NewCachedThreadPool using the queue.
  • PriorityBlockingQueue: An infinite blocking queue with a priority.

ThreadFactory: Used to set up a factory for creating threads. You can use the ThreadFactory to give each created thread a meaningful name.

RejectedExecutionHandler (saturation policy) : When both the queue and the thread pool are full, the thread pool is saturated and a policy must be adopted to handle submitted new tasks. This policy is AbortPolicy by default, indicating that an exception is thrown when a new task cannot be processed. The RejectedExecutionHandler interface can be used to customize a policy

  • AbortPolicy: Default saturation policy that will throw unchecked RejectedExecution-Exception. Callers can catch this exception and then write their own handling code as needed.
  • CallerRunsPolicy: Implements a moderation mechanism that neither abandons the task nor throws an exception. It does not execute the newly submitted task on a thread in the thread pool, but on a main thread that calls EXECUTE.
  • DiscardOldestPolicy: Discards the next task to be executed and then attempts to resubmit a new task. (If the work queue is a priority queue, the discard oldest policy will result in the abandonment of the highest priority task, so it is best not to use the discard oldest saturation policy together with the priority queue.
  • DiscardPolicy: Do not process, discard.

Set the thread pool size

The ideal size of the thread pool depends on the type of task being submitted and the characteristics of the deployed system. Thread pool sizes are usually not fixed in code, and if the thread pool is too large, a large number of threads will compete for relatively few CPU and memory resources, which will not only result in higher memory usage, but may also run out of resources. If the thread pool is too small, many idle processors will not be able to perform their work, reducing throughput.

To properly configure thread pools, you must first analyze task characteristics, which can be analyzed from the following perspectives:

  • Cpu-intensive tasks: Cpu-intensive tasks should be configured with the smallest possible threads, for example, a thread pool of N CPUS +1 threads.

  • IO intensive task: If the I/O intensive task thread does not execute tasks all the time, configure as many threads as possible, for example, 2 x N cpus.

  • Mixed task: If it can be split into one CPU intensive task and one IO intensive task, the throughput of the split execution will be higher than that of the serial execution as long as the time difference between the two tasks is not too great. If the execution time of the two tasks is too different, there is no need to break them down.

    You can use runtime.getruntime ().availableProcessors() to get the number of cpus on the current device.

When a task requires some kind of resource managed through a resource pool, such as a database connection, the size of the thread pool and the resource pool will affect each other. If each task requires a database connection, the size of the connection pool limits the size of the thread pool. Similarly, when tasks in the thread pool are the sole consumers of database connections, the size of the thread pool limits the size of the connection pool.

Extension ThreadPoolExecutor

ThreadPoolExecutor is extensible and provides several methods that can be overridden in subclassing:

  • BeforeExecute (): Calls methods before executing a given Runnable in a given thread.
  • AfterExecute (): Calls a method after executing the specified Runnable.
  • Terminated (): method called when the executing program is terminated

These methods can be used to extend the behavior of ThreadPoolExecutor. Methods such as beforeExecute and afterExecute are called in the thread executing the task, where you can also add logging, timing, monitoring, or statistics collection capabilities. AfterExecute is called whether the task returns normally from a Run or if it returns by throwing an exception. (afterExecute is not called if the task completes with an Error.) If beforeExecute throws a RuntimeException, the task will not be executed and afterExecute will not be called. Call terminated when the thread pool is terminated, that is, after all tasks are completed and all worker threads are terminated. Terminated is used to release resources that Executor allocates during its life cycle. In addition, it can perform operations such as sending notifications, logging, or collecting Finalize statistics.

Closing the thread pool

A thread pool can be shutdown by calling its shutdown or shutdownNow methods. They work by iterating through worker threads in a thread pool and then interrupting them by calling the thread_by-thread_interrupt method, so tasks that cannot respond to interrupts may never be terminated. ShutdownNow first sets the state of the thread pool to STOP, then tries to STOP all threads executing or suspending tasks and returns the list of tasks waiting to be executed, while shutdown only sets the state of the thread pool to shutdown. It then interrupts all threads that are not executing tasks.

The isShutdown method returns true whenever either of the two shutdown methods is called. The thread pool is closed successfully when all tasks are closed, and calling isTerminaed returns true. Which method should be called to shutdown the thread pool depends on the nature of the task submitted to the pool. The shutdown method is usually called to shutdown the thread pool, or the shutdownNow method can be called if the task is not necessarily finished.

Fork/Join framework

The Fork/Join framework is a framework provided in Java 7 for parallel execution of tasks. It is a framework for dividing large tasks into several smaller tasks and finally summarizing the results of each small task.

Fork is to divide a large task into several sub-tasks for parallel execution, Join is to merge the execution results of these sub-tasks, and finally get the result of the large task. For example, calculate 1+2+… +10000, can be divided into 10 subtasks, each subtask to sum 1000 numbers, and finally summarize the results of the 10 subtasks.

Design of Fork/Join framework

Fork/Join uses two classes to do two things.

  • Split tasks. First, we need to have a fork class to split the large task into subtasks, which may still be large, so we need to keep splitting until the subtasks are small enough.
  • Perform tasks and merge results. The partitioned subtasks are placed in a two-end queue, and then several starting threads separately get the task to execute from the two-end queue. The results of subtasks are placed in a queue, and a thread is started to take data from the queue and merge the data.

ForkJoinTask: To use the ForkJoin framework, a ForkJoin task must first be created. It provides a mechanism to perform fork() and join() operations in tasks and sections. In general, we do not inherit ForkJoinTask directly, only subclasses. The Fork/Join framework provides the following two subclasses.

  • RecursiveAction: Used for tasks that do not return results.
  • RecursiveTask: Used for tasks that return results.

ForkJoinPool: A ForkJoinTask needs to be executed through a ForkJoinPool. The subtasks segmented by the task are added to the two-ended queue maintained by the current worker thread and enter the head of the queue. When a worker thread has no work in its queue temporarily, it randomly fetches a task from the tail of another worker thread’s queue.

Use the Fork/Join framework

Using the Fork/Join framework, the requirements are: compute the result of 1+2+3+4

public class CountTask extends RecursiveTask<Integer> { private static final int THRESHOLD = 2; // Threshold private int start; private int end; public CountTask(int start, int end) { this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; Boolean canCompute = (end-start) <= THRESHOLD; if (canCompute) { for (int i = start; i <= end; i++) { sum += i; Int middle = (start + end) / 2; int middle = (start + end) / 2; CountTask leftTask = new CountTask(start, middle); CountTask rightTask = new CountTask(middle + 1, end); // Execute the subtask lefttask.fork (); rightTask.fork(); Int leftResult= lefttask.join (); int rightResult=rightTask.join(); Sum = leftResult + rightResult; } return sum; } public static void main(String[] args) { ForkJoinPool forkJoinPool = new ForkJoinPool(); CountTask task = new CountTask(1, 4); CountTask task = new CountTask(1, 4); // Execute a task Future<Integer> result = forkJoinPool.submit(task); try { System.out.println(result.get()); } catch (InterruptedException e) { } catch (ExecutionException e) { } } }Copy the code

The main difference between ForkJoinTask and ordinary tasks is that it requires the compute method, in which the task is first determined to be small enough and then executed. If it is not small enough, it must be split into two subtasks. When each subtask is fork, compute is entered to see if the current subtask needs to be split into subtasks. If not, the previous subtask is executed and the result is returned. Using the Join method waits for the subtask to complete and get its results.

Exception handling of Fork/Join framework

ForkJoinTask may occasionally raise exceptions when they are being executed, but there is no way to catch exceptions when they are being recorded directly on the main thread, so ForkJoinTask provides an isCompletedAbnormally() method to check if a task has abnormally encountered or been canceled. An exception can be obtained through the getException method of ForkJoinTask.

if(task.isCompletedAbnormally()){
    System.out.println(task.getException());
}
Copy the code

The getException method returns the Throwable object or CancellationException if the task was canceled. Returns NULL if the task did not complete or if no exception was thrown.

The Executor framework is a powerful and flexible framework for concurrent execution of tasks. It provides a number of adjustable options, such as policies for creating and closing threads, policies for handling queued tasks, policies for handling too many tasks, and provides custom methods to extend its behavior. However, as with most powerful frameworks, some of these Settings don’t work very well, certain types of tasks require specific execution strategies, and some combinations of parameters can produce strange results.

reference

Java concurrent programming practice Java concurrent programming art alibaba Java development manual JDK8API