1. Thread pool concept

Concept: In general, a request requires at least one thread to execute the specific request content. If the number of requests is very large and the processing time of the request content is very short, it will cause frequent creation and destruction of threads, and the system overhead is high.

  • Thread pool advantages:
    • Thread pools can maintain the life cycle of multiple threads.
    • Thread pools can improve thread utilization through thread reuse.
    • Thread pools can create a batch of threads ahead of time that are already there when the request arrives, so they inadvertently eliminate the delay associated with thread creation so that the request can be served immediately, making the application more responsive.
    • The number of threads in the thread pool can be adjusted so that when the number of requests exceeds a certain threshold, it forces any other incoming requests to wait until a thread is available to process them, preventing resource exhaustion.
  • Disadvantages of thread pools:
    • The thread pool itself also needs to be maintained and is not recommended when the number of threads is very small or the execution time is very long.
    • Thread pools are more vulnerable to concurrency risks than synchronization errors, pool-related deadlocks, insufficient resources, and thread leaks.

2. Callable

Concept: The Callable interface is also a thread interface, with a call() that returns a value. Threads implementing the Callable interface can be asynchronously submitted to a thread pool, which performs call() asynchronously and non-blocking. The result of the call() execution is bound by the thread pool to an instance of the Future interface.

  • Callable structure:
    • Callable is a functional interface that can use anonymous inner classes or lambda representations to complete the creation process.
    • When you declare a Callable instance, you need to specify a generic, which is calledcall()Method return value type.
  • Future common methods:
    • V get(): Gets the threadcall()Method, which blocks until a result is retrieved.
    • boolean cancel(boolean mayInterruptIfRunning): Cancels the thread task.
  • Callable versus Runnable:
    • Implement Runnable interface, override thread body methodrun(): Returns void.
    • Implement Callable interface, rewrite the thread body methodcall(): the return value is interface generics and will be put into a Future.

Source: / javase – advanced /

  • src: c.y.thread.pool.CallableTest
/ * * *@author yap
 */
public class CallableTest {

    @SneakyThrows
    @Test
    public void callable(a) {
        // can instead of lambda..
        Callable<Integer> callable = () -> {
            TimeUnit.SECONDS.sleep(2L);
            return 100;
        };

        ExecutorService executorService = Executors.newCachedThreadPool();

        // non-blocking
        Future<Integer> future = executorService.submit(callable);

        System.out.println("thread-main...");
        System.out.println("thread-main...");

        // blocking
        System.out.println(future.get());
        System.out.println("thread-main...");
        System.out.println("thread-main...");

        executorService.shutdown();
    }

    @SneakyThrows
    @After
    public void after(a) { System.out.println(System.in.read()); }}Copy the code

3. FutureTask

Concept: The FutureTask class implements both the Runnable and Future interfaces, so it can either act as a threaded task or bind the results of a threaded task directly to store it.

Source: / javase – advanced /

  • src: c.y.thread.pool.FutureTaskTest
/ * * *@author yap
 */
public class FutureTaskTest {
    @SneakyThrows
    @Test
    public void futureTask(a) {
        FutureTask<Integer> futureTask = new FutureTask<>(() -> {
            TimeUnit.SECONDS.sleep(2L);
            return 100;
        });

        new Thread(futureTask).start();
        System.out.println("thread-main...");
        System.out.println("thread-main...");

        // blocking
        System.out.println(futureTask.get());
        System.out.println("thread-main...");
        System.out.println("thread-main...");
    }

    @SneakyThrows
    @After
    public void after(a) { System.out.println(System.in.read()); }}Copy the code

4. CompletableFuture

Concept: The CompletableFuture class uniformly manages multiple tasks with return values.

  • static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier):
    • Asynchronously executes a task with a return value and stores the result of the task.
  • static CompletableFuture<Void> runAsync(Runnable runnable):
    • Asynchronously executes a task with no return value.
  • static CompletableFuture<Void> allOf(CompletableFuture<? >... cfs):
    • Returns a new CompletableFuture when all the specified CompletableFutures are complete.
  • static CompletableFuture<Void> anyOf(CompletableFuture<? >... cfs):
    • Returns a new CompletableFuture when any of the specified CompletableFuture is complete.
  • T join():
    • Returns the final result.

Source: / javase – advanced /

  • src: c.y.thread.pool.CompletableFutureTest
/ * * *@author yap
 */
public class CompletableFutureTest {

    @SneakyThrows
    private String taskA(a) {
        TimeUnit.SECONDS.sleep(1L);
        System.out.println("taskA over...");
        return "taskA-over";
    }

    @SneakyThrows
    private String taskB(a) {
        TimeUnit.SECONDS.sleep(2L);
        System.out.println("taskB over...");
        return "taskB-over";

    }

    @SneakyThrows
    private String taskC(a) {
        TimeUnit.SECONDS.sleep(3L);
        System.out.println("taskC over...");
        return "taskC-over";
    }


    @SneakyThrows
    private void taskD(a) {
        TimeUnit.SECONDS.sleep(1L);
        System.out.println("taskD over...");
    }

    @SneakyThrows
    private void taskE(a) {
        TimeUnit.SECONDS.sleep(2L);
        System.out.println("taskE over...");

    }

    @SneakyThrows
    private void taskF(a) {
        TimeUnit.SECONDS.sleep(3L);
        System.out.println("taskF over...");
    }

    @SneakyThrows
    @Test
    public void performTask(a) {
        long start = System.currentTimeMillis();
        taskA();
        taskB();
        taskC();
        taskD();
        taskE();
        taskF();
        long end = System.currentTimeMillis();
        System.out.println(end - start);
    }

    @SneakyThrows
    @Test
    public void performTaskByCompletableFuture(a) {
        long start = System.currentTimeMillis();
        CompletableFuture<String> futureA = CompletableFuture.supplyAsync(this::taskA);
        CompletableFuture<String> futureB = CompletableFuture.supplyAsync(this::taskB);
        CompletableFuture<String> futureC = CompletableFuture.supplyAsync(this::taskC);
        CompletableFuture<Void> futureD = CompletableFuture.runAsync(this::taskD);
        CompletableFuture<Void> futureE = CompletableFuture.runAsync(this::taskE);
        CompletableFuture<Void> futureF = CompletableFuture.runAsync(this::taskF);
        CompletableFuture<Void> future = CompletableFuture.allOf(futureA, futureB, futureC, futureD, futureE, futureF);
        future.join();
        long end = System.currentTimeMillis();
        System.out.println(end - start);
    }

    @SneakyThrows
    @After
    public void after(a) { System.out.println(System.in.read()); }}Copy the code

5. ThreadPoolExecutor

Concept: A thread pool maintains a collection of HashSet structured threads and a task queue.

  • Structure:ThreadPoolExecutor(), there are seven parameters:
    • corePoolSize: number of core threads, the minimum number of threads maintained by the thread pool and not returned to the OS even if idle.
    • maximumPoolSize: The maximum number of threads maintained by the thread pool, that is, the maximum number of threads that can be extended when there are no more core threads.
    • keepAliveTime: Maximum idle time allowed by non-core threads. If a thread’s idle time exceeds this value, it will be returned to the OS.
    • unit: The largest unit of free time.
    • workQueue: The work queue used by the thread pool. Type BlockingQueue.
    • threadFactory: thread factory, used to generate threads, customizable, using DefaultThreadFactory by default.
    • handler: RejectedExecutionHandler type: RejectedExecutionHandler Specifies the RejectedExecutionHandler type.
  • Methods:
    • void execute(Runnable command): submits the Runnable interface instance to the thread pool.
    • Future<T> submit(Callable<T> task): commits the Callable interface instance to the thread pool and binds the return value result to the Future.
    • void shutdown(): No more threads are added and a stop signal is sent. After all threads are executed, the thread pool is closed to save resources.
    • void shutdownNow(): Closes the thread pool immediately to save resources.

Flow: Suppose the thread pool is set to have 2 core threads, 4 maximum threads, a fixed value of 2 work column, and a maximum lifespan of 10s:

  • When the thread pool is first created, the task queue is empty.
  • When calling theexecute()When adding a task:
    • When the first task arrives, start a thread, the core thread.
    • When the second task comes, start a thread, the core thread, at which point the number of core threads has reached the maximum.
    • When the third task comes, the task enters the queue and blocks.
    • When the fourth task comes, the task enters the queue and blocks. The task queue is full.
    • When task 5 arrives, extend a new thread to handle the task.
    • When the sixth task comes, expand a new thread to handle the task, and the maximum number of threads is reached.
    • When task 7 arrives, execute the reject policy.
  • When a thread completes a task, it takes the next task from the wait queue and executes it.
  • When a thread has nothing to do for more than 10 seconds, it is disabled as long as it is not a core thread.

Source: / javase – advanced /

  • src: c.y.thread.pool.ThreadPoolExecutorTest
/ * * *@author yap
 */
public class ThreadPoolExecutorTest {

    @SneakyThrows
    @Test
    public void threadPoolExecutor(a) {
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
                2.4.3, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.DiscardOldestPolicy()
        );

        threadPool.execute(() -> {
            System.out.println("runnable...");
        });

        TimeUnit.SECONDS.sleep(2L);

        Future<Integer> future = threadPool.submit(() -> {
            System.out.println("callable...");
            return 100;
        });
        System.out.println(future.get());
    }


    @SneakyThrows
    @After
    public void after(a) { System.out.println(System.in.read()); }}Copy the code

6. Thread pool rejection policy

Concept: The thread pool rejection policy refers to the rejection of new threads when the number of threads reaches the maximum. There are four built-in methods:

  • AbortPolicy: Directly throws an exception to prevent the system from working properly.
  • DiscardPolicy: Simply do nothing, just drop the task.
  • DiscardOldestPolicy: Discards the oldest request (the first one in the task queue) and allows the new task to enter the queue.
  • CallerRunsPolicy: As long as the thread pool is not closed, this policy executes the currently discarded task directly in the caller thread, that is, which thread called the execute() method of the thread pool and which thread executes the new task.

Source: / javase – advanced /

  • src: c.y.thread.pool.RejectionStrategyTest
/ * * *@author yap
 */
public class RejectionStrategyTest {

    private static class MyTask implements Runnable {
        private int i;

        private MyTask(int i) {
            this.i = i;
        }

        @SneakyThrows
        @Override
        public void run(a) {
            System.out.println(Thread.currentThread() + ":" + i);
        }

        @Override
        public String toString(a) {
            return "MyTask{" + "i=" + i + "}"; }}private void policy(RejectedExecutionHandler policy) {
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
                2.4.60, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(4),
                Executors.defaultThreadFactory(),
                policy);

        for (int i = 0, j = 8; i < j; i++) {
            threadPool.execute(new MyTask(i));
        }

        System.out.println("queue:" + threadPool.getQueue());
        threadPool.execute(new MyTask(100));
        System.out.println("queue:" + threadPool.getQueue());
        threadPool.shutdown();
    }

    @Test
    public void abortPolicy(a) {
        policy(new ThreadPoolExecutor.AbortPolicy());
    }

    @Test
    public void discardPolicy(a) {
        policy(new ThreadPoolExecutor.DiscardPolicy());
    }

    @Test
    public void discardOldestPolicy(a) {
        policy(new ThreadPoolExecutor.DiscardOldestPolicy());
    }

    @Test
    public void callerRunsPolicy(a) {
        policy(new ThreadPoolExecutor.CallerRunsPolicy());
    }

    @SneakyThrows
    @After
    public void after(a){ System.out.println(System.in.read()); }}Copy the code

7. Executor framework

Concept: Try to avoid using the Executor framework to create thread pools, because the memory used to create threads is not JVM heap memory, but system memory, and most Executor frameworks allow integer.max_value to create a large number of threads, resulting in OOM.

7.1 CachedThreadPool

Concept: CachedThreadPool is a cacheable thread pool.

  • The number of core threads is 0, meaning that all threads can be reclaimed.
  • The maximum number of threads is integer.max_value, which is almost unlimited and will result in OOM.
  • The maximum idle time of a thread is 60 seconds.
  • Task force listedSynchronousQueueSynchronous queue is mainly used to transfer data by hand, with almost no queue waiting phenomenon. It is suitable for processing a large number of short time consuming tasks.
  • Structure:Executors.newCachedThreadPool():

Source: / javase – advanced /

  • src: c.y.thread.pool.CachedThreadPoolTest
/ * * *@author yap
 */
public class CachedThreadPoolTest {

    @Test
    public void cachedThreadPool(a) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0, j = 10; i < j; i++) {
            TimeUnit.SECONDS.sleep(1);
            executorService.execute(() -> {
                System.out.println("hello!"); }); }}@SneakyThrows
    @After
    public void after(a) { System.out.println(System.in.read()); }}Copy the code

7.2 FixedThreadPool

Concept: A FixedThreadPool is an unordered pool with a fixed number of threads.

  • The number of core threads needs to be specified at construction time and cannot be changed once specified.
  • The maximum number of threads is the same as the number of core threads, that is, all threads are core threads.
  • The maximum idle time of a thread is 0s, and the idle time is ignored for the core thread.
  • Task force listedLinkedBlockingQueueUnbounded blocking queue, easy OOM.
  • Structure:Executors.newFixedThreadPool(2):

Source: / javase – advanced /

  • src: c.y.thread.pool.FixedThreadPoolTest
/ * * *@author yap
 */
public class FixedThreadPoolTest {

    @Test
    public void cachedThreadPool(a) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        for (int i = 0, j = 10; i < j; i++) {
            TimeUnit.SECONDS.sleep(1);
            executorService.execute(() -> {
                System.out.println("hello!"); }); }}@SneakyThrows
    @After
    public void after(a) { System.out.println(System.in.read()); }}Copy the code

7.3 ScheduledThreadPool

Concept: ScheduledThreadPool is a pool of periodic threads that can execute tasks either delayed or periodically.

  • Structure:Executors.newScheduledThreadPool(1):
  • Methods:
    • schedule(): How long to delay task execution.
      • Param1: indicates the instance of the Callable or Runnable interface that performs the task.
      • Param2: indicates the delay time for executing a task.
      • Param3: indicates the unit of time.
    • ScheduledFuture<? > scheduleAtFixedRate():
      • Param1: indicates the instance of the Callable or Runnable interface that performs the task.
      • Param2: indicates the delay of executing the task for the first time.
      • Param3: Indicates the interval for executing a periodic task. The value starts when the last task is executed.
      • Param4: indicates the unit of time.
    • ScheduledFuture<? > scheduleWithFixedDelay():
      • Param1: indicates the instance of the Callable or Runnable interface that performs the task.
      • Param2: indicates the delay of executing the task for the first time.
      • Param3: Indicates the interval for executing periodic tasks. The value is calculated from the end of the previous task.
      • Param4: indicates the unit of time.

Source: / javase – advanced /

  • src: c.y.thread.pool.ScheduledThreadPoolTest
/ * * *@author yap
 */
public class ScheduledThreadPoolTest {

    @Test
    public void schedule(a) {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);
        executorService.schedule(() -> {
            System.out.println("delay 2s and print");
        }, 2, TimeUnit.SECONDS);
    }

    @Test
    public void scheduleAtFixedRate(a) {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
        executorService.scheduleAtFixedRate(() -> {
            System.out.println("delay 2s and every 1s");
        }, 2.1, TimeUnit.SECONDS);
    }

    @Test
    public void scheduleWithFixedDelay(a) {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
        executorService.scheduleWithFixedDelay(() -> {
            System.out.println("delay 2s and every 1s");
        }, 2.1, TimeUnit.SECONDS);
    }

    @SneakyThrows
    @After
    public void after(a) { System.out.println(System.in.read()); }}Copy the code

7.4 SingleThreadPool

Concept: SingleThreadExecutor is a thread pool with only one thread to execute tasks.

  • The number of core threads is 1, and the maximum number of threads is 1, i.e. there is only one core thread in the thread pool.
  • The maximum idle time of a thread is 0s, and the idle time is ignored for the core thread.
  • Task force listedLinkedBlockingQueueUnbounded blocking queue, easy OOM.
  • Structure:Executors.newSingleThreadExecutor():
  • SingleThreadExecutor Common users ensure that all tasks are executed in the specified order.
  • This unique thread cannot be changed.
  • Structure:Executors.newSingleThreadExecutor():

Source: / javase – advanced /

  • src: c.y.thread.pool.SingleThreadPoolTest
/ * * *@author yap
 */
public class SingleThreadPoolTest {

    @Test
    public void cachedThreadPool(a) throws InterruptedException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        for (int i = 0, j = 10; i < j; i++) {
            TimeUnit.SECONDS.sleep(1);
            executorService.execute(() -> {
                System.out.println("hello!"); }); }}@SneakyThrows
    @After
    public void after(a) { System.out.println(System.in.read()); }}Copy the code

8. Customize thread pools

The thread pool framework provided by Executors has some limitations and limitations. We usually customize the thread pool parameters to control the thread pool characteristics:

  • Custom thread pool factory:
    • Implement the ThreadFactory interface.
    • rewriteThread newThread(Runnable r).
    • Create a custom named thread.
  • Custom rejection policy:
    • Implement the RejectedExecutionHandler interface.
    • rewritevoid rejectedExecution(Runnable r, ThreadPoolExecutor e).
    • Save unprocessed tasks to Redis or a database for processing at another time.

Source: / javase – advanced /

  • src: c.y.thread.pool.CustomThreadPoolTest
/ * * *@author yap
 */
public class CustomThreadPoolTest {

    private static class CustomThreadFactory implements ThreadFactory {

        private final AtomicInteger threadId = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "myThread-"+ threadId.getAndIncrement()); }}private static class CustomRejectedPolicy implements RejectedExecutionHandler {

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            System.out.println(((CustomTask) r).getTaskName() + " is rejected!"); }}@AllArgsConstructor
    @Data
    private static class CustomTask implements Runnable {
        private String taskName;

        @SneakyThrows
        @Override
        public void run(a) {
            System.out.println(Thread.currentThread().getName() + ":" + taskName + " is running!");
            TimeUnit.SECONDS.sleep(3L); }}@Test
    @SneakyThrows
    public void customThreadPool(a) {
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
                2.4.10, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(2),
                new CustomThreadFactory(),
                new CustomRejectedPolicy());

        for (int i = 0, j = 8; i < j; i++) {
            threadPool.execute(new CustomTask("myTask-"+ i)); }}@SneakyThrows
    @After
    public void after(a) { System.out.println(System.in.read()); }}Copy the code