A thread is an execution environment, or running program, in which a series of instructions can be executed in a process. Multithreaded programming refers to using multiple threads to perform multiple tasks in parallel. Of course, the JVM has good support for multithreading.

While this brings many benefits, starting with improved program performance, multithreaded programming has its drawbacks — increased code complexity, synchronization issues, unexpected results, and overhead for creating threads.

In this article, we’ll look at how Java thread pools can be used to alleviate these problems.

Why use thread pools?

Creating and starting a thread is expensive. If we repeated this step every time we needed to execute a task, it would be a huge performance overhead, which is what we want to solve with multithreading.

To better understand the overhead of creating and starting a thread, let’s take a look at what the JVM does in the background:

  • Allocate memory for thread stacks and hold stack frames for each thread method call.
  • Each stack frame contains an array of local variables, return values, an operation stack, and a constant pool
  • Some JVMS support local methods and will also allocate a local method stack
  • Each thread gets a program counter that identifies which instruction the processor is executing
  • The system creates a local thread, corresponding to a Java thread
  • Thread-specific descriptors are added to internal JVM data structures
  • Threads share the heap and method area

The java.util.concurrent package has the following Executor interface — a simple interface to perform tasks ExecutorService — a more complex interface, ScheduledExecutorService includes additional methods to manage tasks and the Executor itself ScheduledExecutorService — extends the ExecutorService by adding scheduling methods for executing tasks. The Executors class also provides direct access to executor instances that implement these interfaces

Executors include factory methods for creating different types of thread pools. Exector is a simple thread pool interface that only has a execute() method.

* / * newSingleThreadExecutor() : a thread pool containing a single thread and an unbound queue that can only execute one task at a time. NewFixedThreadPool () : A thread pool containing a fixed number of threads sharing an unbounded queue; When all threads in a working state, when we have a new task to submit tasks waiting in the queue, until a thread is available newCachedThreadPool () : it is only need to create a new thread thread pool newWorkStealingThreadPool () : Thread pools based on work-stealing algorithms, described in the following sections

Callable

The type returned by Callable() is the V type passed in.

public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call(a) throws Exception;
}
Copy the code

Future

A Future is a Future that cancels, queries, and retrieves the results of a specific Runnable or Callable task. If necessary, the results of the execution can be obtained through the GET method, which blocks until the task returns results.

The Future class, located under the java.util.Concurrent package, is an interface:

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled(a);
    boolean isDone(a);
    V get(a) throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
Copy the code

There are five methods declared in the Future interface, and the purpose of each method is explained below:

  • The cancel method is used to cancel a task, returning true on success or false on failure. The mayInterruptIfRunning parameter indicates whether it is allowed to cancel tasks that are in progress but have not completed,

If set to true, a task in progress can be canceled. Whether mayInterruptIfRunning is true or false, this method must return false if the task is completed, or if you cancel the completed task. True if the task is executing and false if mayInterruptIfRunning is set to true. If the task has not yet been executed, true is always returned regardless of whether mayInterruptIfRunning is true or false.

  • The isCancelled method indicates whether the task was cancelled successfully or returns true if the task was cancelled before it completed normally.
  • The isDone method indicates whether the task is complete, and returns true if it is.
  • The get() method is used to get the result of the execution, which blocks and does not return until the task is finished.
  • Get (long timeout, TimeUnit Unit) is used to obtain the execution result. If no result is obtained within the specified time, null is returned.

In other words, Future provides three functions:

  1. Judge whether the task is completed;
  2. Can interrupt the task;
  3. Obtain the task execution results.

Because a Future is just an interface, it cannot be used directly to create objects, hence the following FutureTask.

Implementation of FutureTask:

public class FutureTask<V> implements RunnableFuture<V>
Copy the code

The FutureTask class implements the RunnableFuture interface. Let’s look at the implementation of the RunnableFuture interface:

public interface RunnableFuture<V> extends Runnable.Future<V> {
    void run(a);
}
Copy the code

As you can see, RunnableFuture inherits the Runnable and Future interfaces, while FutureTask implements the RunnableFuture interface. So it can either be executed by the thread as a Runnable, or it can be used as a Future to get the return value of a Callable.

The difference between ShutDown and ShutDownNow

Literally, shutdownNow() immediately stops the thread pool, stopping both running and waiting tasks. This works immediately, but is risky; Shutdown () simply closes the submission channel. Submit () is invalid; And the inside runs the same way, and then stops.

shutdown()

Setting the thread pool state to SHUTDOWN does not stop it immediately:

The task that stops receiving the external submit and the task that’s running inside and the task that’s waiting in the queue will finish and wait until the second step completes

shutdownNow()

Set the thread pool state to STOP. An attempt to stop immediately, not necessarily:

As with shutdown(), stop receiving externally submitted tasks and ignore tasks waiting in the queue in an attempt to return the running task interrupt to the list of unfinished tasks

awaitTermination(long timeOut, TimeUnit unit)

The current thread blocks until

When all committed tasks (both running and queued) are complete or when timeout expires or the thread is interrupted, throw InterruptedException and return true (all tasks complete after shutdown request) or false (timed out).

conclusion

  • Gracefully close, using shutdown(), and then no new tasks can be submitted
  • To close immediately and get a list of unexecuted tasks, use shutdownNow().

AwaitTermination () does not commit, awaitTermination() is blocked and returns whether the thread pool has stopped (true/false); Shutdown () does not block.

ShuntdownNow () > shutdown() > awaitTermination()

ThreadPoolExecutor constructor

// The full constructor for the Java thread pool
public ThreadPoolExecutor(
  intCorePoolSize, // The number of threads that can be maintained by the thread pool for a long time, even if the thread is Idle.intMaximumPoolSize, // Maximum number of threadslongKeepAliveTime, TimeUnit unit, // The idle duration of the thread exceeding corePoolSize. BlockingQueue<Runnable> workQueue, // Task queue ThreadFactory ThreadFactory, // new thread generation method RejectedExecutionHandler handler) // Reject the policy
Copy the code

The working order of the thread pool

If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing. If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread. If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected.

  1. If fewer threads are running than corePoolSize, Executor will always prefer to add new threads rather than queue them.
  2. If the number of running threads is equal to or greater than corePoolSize, Executor will always prefer to queue requests rather than add new threads.
  3. If you can’t request to join the queue, then create a new thread, unless to create this thread beyond maximumPoolSize, in this case, the task will be rejected (throw RejectedExecutionException).

How to use thread pools correctly

Avoid the use of unbounded queue Don’t use Executors. NewXXXThreadPool fast () method creates a thread pool, because this way can use unbounded task queue, to avoid OOM, we should use the ThreadPoolExecutor constructor manual specifies the maximum length of the queue.

The task queue is always full, so what happens if you submit() a new task? The RejectedExecutionHandler interface provides control methods as follows:

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
Copy the code

The default rejection behavior of the thread pool is AbortPolicy, which throws a RejectedExecutionHandler exception that is not checked and is easy to forget to catch. If you don’t care about task rejection events, you can set the rejection policy to DiscardPolicy so that redundant tasks are quietly ignored.

Demo used by policies

public static void main(String[] args) {
		final int corePoolSize = 1;
		final int maxPoolSize = 2;
		BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(1);

		 / / reject strategy 1: throws RejectedExecutionException. You can switch to another policy here
		ThreadPoolExecutor executor = new ThreadPoolExecutor
		(corePoolSize,maxPoolSize, 5,TimeUnit.SECONDS, queue, 
				new ThreadPoolExecutor.AbortPolicy());

		for(int i=0; i<4; i++) {
			executor.execute(new Worker());
		}
		executor.shutdown();
	}

public static void testShutDown(int startNo) throws InterruptedException {
		ExecutorService executorService = Executors.newFixedThreadPool(2);
		for (int i = 0; i < 5; i++) {
			executorService.execute(getTask(i + startNo));
		}
		executorService.shutdown();
		// awaitTermination is blocking
		executorService.awaitTermination(1, TimeUnit.DAYS);
		System.out.println("shutDown->all thread shutdown");
	}
Copy the code

The processing results of the fetch and exception thread pool, as well as the exceptions in the process, are wrapped in the Future and retrieved when the future.get () method is called. The exceptions during execution are wrapped as ExecutionExceptions. The submit() method itself does not pass results and exceptions during task execution. The code to get the result of the execution could be written like this:

ExecutorService executorService = Executors.newFixedThreadPool(4);
Future<Object> future = executorService.submit(new Callable<Object>() {
        @Override
        public Object call(a) throws Exception {
            throw new RuntimeException("exception in call~");// This exception is passed to the caller when future.get () is called}});try {
  Object result = future.get();
} catch (InterruptedException e) {
  // interrupt
} catch (ExecutionException e) {
  // exception in Callable.call()
  e.printStackTrace();
Copy the code

Construct thread pools correctly

int poolSize = Runtime.getRuntime().availableProcessors() * 2;
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(512);
RejectedExecutionHandler policy = new ThreadPoolExecutor.DiscardPolicy();
executorService = new ThreadPoolExecutor(poolSize, poolSize,
    0, TimeUnit.SECONDS, queue, policy);
Copy the code

Get a single result

Submit () returns a Future after submitting a task to the thread pool. Call V future.get () to block the execution result, and V GET (long timeout, TimeUnit unit) to specify the timeout time for waiting.

Get multiple results

If you commit multiple tasks to a thread pool, you can call Future.get() in turn to get the execution results of those tasks. But for this scenario, we should use ExecutorCompletionService, the class of take () method always blocked waiting for a certain task to complete, and then return to the task of the Future object. The completionService.take () method is called the same number of times after a batch task is submitted to CompletionService to obtain the execution results of all tasks in any order, depending on the order in which the tasks are completed:


void solve(Executor e, Collection
       
        > solvers)
       
    throws InterruptedException, ExecutionException {
  CompletionService<Result> cs
      = new ExecutorCompletionService<>(e);
  solvers.forEach(cs::submit);
  for (int i = solvers.size(); i > 0; i--) {
    Result r = cs.take().get();
    if(r ! =null) use(r); }}Copy the code

Timeout for a single task

The V future. get(long timeout, TimeUnit Unit) method can specify a timeout period to wait for. A TimeoutException will be thrown if the timeout period is not completed.

Timeout for multiple tasks

Waiting for multiple tasks to complete and setting the maximum waiting time can be done with CountDownLatch:

public void testLatch(ExecutorService executorService, List<Runnable> tasks) 
    throws InterruptedException{
       
    CountDownLatch latch = new CountDownLatch(tasks.size());
      for(Runnable r : tasks){
          executorService.submit(new Runnable() {
              @Override
              public void run(a) {
                  try{
                      r.run();
                  }finally {
                      latch.countDown();// countDown}}}); } latch.await(10, TimeUnit.SECONDS); // Specify the timeout period
  }
Copy the code

reference

In-depth study Java thread pool at www.importnew.com/29212.html

ThreadPoolExecutor the shutdown (), shutdownNow (), awaitTermination () usage and difference between blog.csdn.net/u012168222/…

Java thread pool details