Hello, everyone, I am the southern orange who has been practicing Java for two and a half years. Partners can exchange experience with each other.

Extend ThreadPoolExecutor

1. Introduction of extension methods

ThreadPoolExecutor is extensible and internally provides several methods that can be overridden in subclasses (in the red box). The JDK notes that these methods can be used to add logs, time, monitor, or collect statistics. Does it feel familiar? Is there a perception of @around @before @after in Spring AOP?

So let’s compare that

ThreadPoolExecutor spring aop
BeforeExecute (),Called before the thread executes) @ Before (Executes before the intercepted method executes
AfterExecute (),Called after the thread executes) @ After (Executes after the execution of the intercepted method)
Terminated (),Called when the thread pool exits)
@ Around (You can execute both before and after the intercepted method)

In fact, they have the same effect, only one in the thread pool and one in the interceptor.

These methods in ThreadPoolExecutor have the following characteristics:

  • AfterExecute is called whether the task returns normally from a run or if an exception is thrown (afterExecute is not called if the task completes with an Error).

  • Also, if beforeExecute throws a RuntimeExecption, the task will not be executed and afterExecute will not be called.

  • Terminated is called when the thread pool is terminated, similar to the finally operation in try-catch. Terminated is used to release resources that Executor allocates during its life cycle. It is also used to perform operations such as sending notifications, logging, or collecting Finalize statistics.

2. Extended method implementation

Let’s start by building a custom thread pool that extends methods to add logging and statistics collection. To measure the running time of a task, beforeExecute must record the start time and store it in a place that afterExecute can access, so use ThreadLocal to store variables and afterExecute to read, And output average task and log messages terminated.

public class WeedThreadPool extends ThreadPoolExecutor { private final ThreadLocal<Long> startTime =new ThreadLocal<>();  private final Logger log =Logger.getLogger("WeedThreadPool"); Private Final AtomicLong numTasks =new AtomicLong(); Private Final AtomicLong totalTime =new AtomicLong(); /** * here is the constructor to implement the thread pool, I randomly selected one, Public WeedThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } // Call protected void beforeExecute(Thread t,Runnable r){super.beforeExecute(t,r); System.out.println(String.format("Thread %s:start %s",t,r)); Starttime.set (system.nanotime ()); } // Call protected void afterExecute(Runnable r,Throwable t){try {Long endTime = system.nanotime (); Long taskTime =endTime-startTime.get(); numTasks.incrementAndGet(); totalTime.addAndGet(taskTime); System.out.println(String.format("Thread %s:end %s, time=%dns",Thread.currentThread(),r,taskTime)); }finally { super.afterExecute(r,t); // Protected void terminated(){try{system.out.println (string.format (" terminated: avg time =%dns, ",totalTime.get()/numTasks.get())); }finally { super.terminated(); }}}Copy the code

Test cases:

public class WeedThreadTest {
     BlockingQueue<Runnable> taskQueue;
   final static WeedThreadPool weedThreadPool =new WeedThreadPool(3,10,1, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(100));
    public static void main(String[] args) throws InterruptedException {
        for(int i=0;i<3;i++) {
            weedThreadPool.execute(WeedThreadTest::run);
        }
        Thread.sleep(2000L);
        weedThreadPool.shutdown();
    }

    private static void run() {
        System.out.println("thread id is: " + Thread.currentThread().getId());
        try {
            Thread.sleep(1024L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
Copy the code

3. Application scenarios

The use of these methods is similar to some of the scenarios used in Spring AOP, mainly for record tracking, optimization, such as logging and statistics collection, measuring the running time of tasks, and sending notifications, emails, messages, and so on after some tasks are completed.

CompletionService operates the asynchronous task

1. The principle of asynchronous methods

If we unexpectedly harvest a large number of tasks to be executed (for example, to call the travel ticket information of major travel software), in order to improve the execution efficiency of the task, we can use the thread pool Submit asynchronous calculation task, and obtain the results by calling the GET method of the Future interface implementation class.

Although the use of thread pools can improve execution efficiency, the get method of calling the Future interface implementation class blocks. That is, the get method will return the result only when all tasks associated with the current Future have been completed. If the current task has not been completed, but other tasks associated with the Future have already completed. You’ll waste a lot of time waiting.

So, is there a way to get the first result that completes the loop?

Yes, we can achieve the effect of ExecutorCompletionService, its internal have a blocking fifo queue, used to hold has executed the Future, by calling it take method or poll method can access to an already completes the Future, We then get the final result by calling the Future interface implementation class’s GET method.

The logical diagram is as follows:

ExecutorCompletionService CompletionService interface is achieved, in CompletionService interface defines the following these methods:

  • Future Submit (Callable Task): Submits a Callable task and returns the Future associated with the execution result of the task.

  • Future submit(Runnable task,V result): Submits a Runnable task and returns the Future associated with the execution result of the task;

  • Future take(): Retrieves and removes the first completed task from an internal blocking queue, blocking until a task completes;

  • Future poll(): Retrieves and removes the first completed task from the internal blocking queue, returns null, and does not block;

  • Future poll(long Timeout, TimeUnit Unit): Future poll(long timeout, TimeUnit unit): Retrieves and removes the first completed task from the internal blocking queue.

2. Implementation of asynchronous methods

Public class WeedExecutorServiceDemo {/** * continue to use the thread pool, just change the pool size */ BlockingQueue<Runnable> taskQueue; final static WeedThreadPool weedThreadPool = new WeedThreadPool(1, 5, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100)); public static Random r = new Random(); public static void main(String[] args) throws InterruptedException, ExecutionException { CompletionService<Integer> cs = new ExecutorCompletionService<Integer>(weedThreadPool); for (int i = 0; i < 3; I++) {cs.submit(() -> {// get the task int init = 0; for (int j = 0; j < 100; j++) { init += r.nextInt(); } Thread.sleep(1000L); return Integer.valueOf(init); }); } weedThreadPool.shutdown(); /** * for (int I = 0; i < 3; i++) { Future<Integer> future = cs.take(); if (future ! = null) { System.out.println(future.get()); }}}}Copy the code

The result of the call is

We can also poll it.

Poll for (int I = 0; i < 3; i++) { System.out.println(cs.poll(1200L,TimeUnit.MILLISECONDS).get()); }Copy the code

The result, of course, is the same


If the blocking times were smaller, the current code would break

Poll for (int I = 0; i < 3; i++) { System.out.println(cs.poll(800L,TimeUnit.MILLISECONDS).get()); }Copy the code

Similarly, the poll method can be used to interrupt timeout operations, for example by calling shutdownNow() of the thread pool directly in the case of a poll timeout, brutally shutting down the entire thread pool.

for (int i = 0; i < 3; i++) { Future<Integer> poll = cs.poll(800L, TimeUnit.MILLISECONDS); If (poll==null){system.out.println (" end of execution "); weedThreadPool.shutdownNow(); }}Copy the code

3. Application scenarios

How to asynchronously execute tasks and how to receive tasks should also be considered according to the actual situation.

  • 1. It is recommended that you use CompletionService when you need to batch submit asynchronous tasks. CompletionService combines the functionality of the thread pool Executor and the BlockingQueue BlockingQueue, making it easier to manage batch asynchronous tasks.

  • 2. Organize the results of asynchronous tasks. With this feature, you can easily order subsequent processing and avoid unnecessary waiting.

  • 3. Thread pool isolation. CompletionService supports the creation of a bosom thread pool, and this isolation avoids the risk of a few particularly time-consuming tasks dragging down the entire application.

If you need it, you can add it to my official account. The latest articles in the future will be in it at the first time. If you need the mind map of previous articles, you can also leave a message to me