(Mobile phone landscape view source more convenient)

Note: The Java source analysis section is based on the Java 8 version unless otherwise noted.

Note: The thread pool source section refers to the ThreadPoolExecutor class unless otherwise specified.

Introduction to the

We have studied the common task execution process in thread pool, but there is also a kind of task called future task in thread pool, you can use it to obtain the result of task execution, how to implement it?

Suggest to learn this chapter before the first to see tong elder brother wrote before the “dead kowt Java thread series of their own hands to write a thread pool (continued)”, help to understand the content of this chapter, and there the code is relatively short, learn relatively easy.

The problem

(1) How will the future tasks in the thread pool be executed?

(2) What good design patterns can we learn?

(3) What is the help to our future learning of other frameworks?

To a chestnut

Let’s start with an example to explain the content of the next chapter.

We define a thread pool and use it to submit five tasks that return 0, 1, 2, 3, and 4. At some point in the future, we take their returns and do a summation.

public class ThreadPoolTest02 { public static void main(String[] args) throws ExecutionException, InterruptedException {/ / create a fixed 5 thread thread pool ExecutorService threadPool = Executors. NewFixedThreadPool (5); List<Future<Integer>> futureList = new ArrayList<>(); Return 0, 1, 2, 3, 4 for (int I = 0; i < 5; i++) { int num = i; Future<Integer> Future = ThreadPool.submit (() -> {try {thread.sleep (1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("return: " + num); // return num; }); // Add the future to the list futurelist.add (future); Int sum = 0; for (Future<Integer> future : futureList) { sum += future.get(); } System.out.println("sum=" + sum); }}Copy the code

Here we consider two questions:

(1) If ordinary task is used here, how to write it, and what is the approximate time?

If you use a normal task, you add the sum to the task, and it’s not that easy to write (final problems). The total time is a little over a second. However, the disadvantage of this is that the accumulation operation is coupled to the content of the task itself, and then if you change to the tiring product, you have to modify the content of the task.

(2) If future.get() is placed inside the for loop, what is the approximate time?

Let’s leave that question unanswered and look at source code analysis.

Submit () method

The submit method, which is a way to submit a task with a return value, wraps it internally with a FutureTask, executes it with execute(), and returns the FutureTask itself.

Public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException(); // Wrap FutureTask RunnableFuture<T> fTask = newTaskFor(task); // Pass the execute() method to execute(ftask); // futureTask return ftask; } protected <T> RunnableFuture<T> newTaskFor(Callable<T> Callable) {// Wrap a normal task as FutureTask return new FutureTask<T>(callable); }Copy the code

The design here is clever. In fact, both methods are implemented in the Abstract AbstractExecutorService class, which is an application of the template method.

Let’s look at FutureTask’s inheritance:

FutureTask implements the RunnableFuture interface, which combines the capabilities of the Runnable interface with those of the Future interface, which provides the capability to return values from the GET task.

Question: Why does the submit() method return a Future interface instead of a RunnableFuture or FutureTask class?

Answer: This is because submit() returns a result that only exposes its ability to get() (Future interface) to external callers, but not its ability to run() (Runaable interface).

The Run () method of FutureTask

Execute () calls the task’s run() method, and the task is wrapped as FutureTask. Execute () calls the FutureTask’s run() method. So let’s just look at this method.

Public void run() {// If the state is not NEW or the current thread fails to run the task, return if (state! = NEW || ! UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; Callable<V> c = Callable; // State must be NEW if (c! = null && state == NEW) {// Run the result V result; boolean ran; Result = c.call(); // result = c.call(); // Completed ran = true; } catch (Throwable ex) { result = null; ran = false; // Handle exception setException(ex); } if (ran) set(result); }} finally {// empty runner runner = null; Int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); }}Copy the code

As you can see, the code is relatively simple, doing state detection first, then executing the task, and finally handling the result or exception.

There’s nothing wrong with executing the task, so let’s look at the code that handles the result or exception.

Protected void setException(Throwable t) {// Change the state from NEW to COMPLETING if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING) {// The outcome value is set to the received exception. Outcome = t; PutOrderedInt (this, stateOffset, EXCEPTIONAL); // Final state // Call completion method finishCompletion(); }} protected void set(V V) {// Change the state from NEW to COMPLETING if (UNSAFE.com) pareAndSwapInt(this, stateOffset, NEW, (实 习) {// The outcome of the query must be completed (实 习). PutOrderedInt (this, stateOffset, NORMAL); // Unsafe. putOrderedInt(this, stateOffset, NORMAL); // Final state // Call completion method finishCompletion(); }}Copy the code

At first glance, the two methods seem similar, except that they both end up with different results and different states, and both end up calling the finishCompletion() method.

Private void finishCompletion() {// If the queue is not empty (the queue is actually the caller's thread) for (WaitNode q; (q = waiters) ! = null;) {/ / empty queue if (UNSAFE.com pareAndSwapObject (this, waitersOffset, q, null)) {for (;;) {// The caller Thread Thread t = q.htread; if (t ! = null) { q.thread = null; Locksupport.unpark (t); // If the caller thread is not empty, wake it up. } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; }} // hook method, subclass override done(); Callable = null; // to reduce footprint }Copy the code

The entire run() method is summed up:

(1) FutureTask has a state that controls the running process of the task, and the state is from NEW->COMPLETING->NORMAL, and the state is from NEW-> EXCEPTIONAL;

(2) FutureTask saves the thread runner that runs the task, which is a thread in the thread pool;

The caller thread is saved in the waiters queue, so when was it configured?

(4) After the task is executed, in addition to setting the state change, the caller thread should also be woken up.

When did the caller thread get saved in FutureTask (waiters)? View the constructor:

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}Copy the code

If the caller does not call the get() method, does this future task look like a normal task? It is, so it is only necessary to save the caller thread to FutureTask if the get() method is called.

So, let’s see what’s going on in the get() method.

The FutureTask class get() method

The get() method is called and blocks until the task is complete.

public V get() throws InterruptedException, ExecutionException { int s = state; (c) COMPLETING those well, s <= COMPLETING) s = awaitDone(false, 0L); // COMPLETING those well, s <= COMPLETING) S = awaitDone(false, 0L); Return report(s); }Copy the code

C) COMPLETING tasks well, enter the queue and wait.

private int awaitDone(boolean timed, Throws InterruptedException {// Final long deadline = timed? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) {// Handle interrupt if (thread.interrupted ()) {removeWaiter(q); throw new InterruptedException(); Int s = state (实 习);} // (实 习) (实 习) if (s > COMPLETING) { if (q ! = null) q.thread = null; return s; } / / if the status is equal to the COMPLETING of the task was finished quickly, just set the state to the NORMAL or EXCEPTIONAL and setting the / / let the CPU at this time, (S == 实 习) // Cannot time out yet thread.yield (); Else if (q == null) // Initialize the queue (WaitNode records the caller thread) q = new WaitNode(); // 2. Queue not entered else if (! Queued) / / try to enqueue queued = UNSAFE.com pareAndSwapObject (this, waitersOffset q.n ext = waiters, q); Else if (timed) {nanos = deadline-system.nanotime (); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } // 3. Block the current thread (caller thread) else // locksupport. park(this); }}Copy the code

Here we assume that the task has not yet been executed when we call get(), i.e. its state is NEW, and we try to follow the logic indicated above: 1, 2, 3, 4

(1) The first loop, with a state of NEW, goes directly to 1, initializes the queue and wraps the caller thread in WaitNode;

(2) For the second loop, the state is NEW, the queue is not empty, and the WaitNode containing the caller’s thread is enqueued at 2;

(3) The third loop, the state is NEW, the queue is not empty, and the queue has been queued, to 3, blocking the caller thread;

(4) Suppose that after a while the task is finished, according to the analysis of the run() method, the caller thread will be unpark at last, that is, 3 places will be woken up;

(5) The state of the fourth cycle must be greater than that of COMPLETING the cycle and returning;

Question: Why control the entire flow in the for loop? Can you write each step separately?

A: Because every action needs to check whether the state has changed, it is also possible to write it out, but the code will be very lengthy. The state of get() is NEW. Other states can be verified by themselves, and are guaranteed to be correct, even if two threads are running across each other (breakpoint technique).

OK, so after I go back here, let’s see what I did with the final result.

private V report(int s) throws ExecutionException { Object x = outcome; If (s == NORMAL) return (V)x; // CANCELLED if (s >= CANCELLED) throw new CancellationException(); Throw new ExecutionException((Throwable)x); }Copy the code

Remember when we analyzed RUN, we put exception in outcome when we performed an exception. That’s what we used here.

(1) If normal execution ends, the return value of the task is returned;

(2) If the exception ends, wrap it as ExecutionException and throw it;

In this way, exceptions in the thread can also be returned to the caller thread, unlike normal tasks where the caller does not know whether the task was successfully executed or not.

other

In addition to retrieving the return value of the task, FutureTask can also cancel the execution of the task.

public boolean cancel(boolean mayInterruptIfRunning) { if (! (state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // in case call to interrupt throws exception if (mayInterruptIfRunning) { try { Thread t = runner; if (t ! = null) t.interrupt(); } finally { // final state UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { finishCompletion(); } return true; }Copy the code

Here the cancellation task is handled by interrupting the execution thread, interested students can analyze for themselves.

Answer the opening

If we put future.get() inside the for loop here, what is the approximate time?

A: Probably a little more than 5 seconds, because each task submission blocks the caller’s thread until the task is completed, and each task is over 1 second, so the total time is a little more than 5 seconds.

conclusion

(1) Future tasks are realized by packaging common tasks as FutureTasks.

(2) FutureTask can not only obtain the result of task execution, but also perceive the anomaly of task execution, and even cancel the task;

(3) AbstractExecutorService defines many template methods, which is an important design pattern.

(4) FutureTask is actually a typical implementation of the exception call. We will see this design idea later when we learn Netty and Dubbo.

eggs

How are asynchronous calls implemented in RPC framework?

Answer: RPC framework commonly used call methods are synchronous call, asynchronous call, in fact, they are asynchronous call in nature, they are implemented in the way of FutureTask.

Typically, the remote interface is called by a thread (we call it a remote thread). If the call is synchronous, the caller’s thread blocks waiting for the result of the call from the remote thread, and then returns. If the call is asynchronous, FutureXxx returns something that can be retrieved from the remote result in the future. Of course, if this FutureXxx calls get() before the remote result is returned, it will block the caller’s thread.

For those of you interested, take a look at dubbo’s asynchronous call (which throws a Future into the RpcContext).

Welcome to pay attention to my public number “Tong Elder brother read source code”, view more source code series articles, with Tong elder brother tour the ocean of source code.