preface

The last article wrote the Java thread pool implementation principle and source code analysis, said good is the real big meet, want to let you have a thorough understanding of the thread pool through an article, but the article always feel what shortcomings?

Submit () returns a value that can retrieve the result of a thread’s execution, Future

. In this tutorial, we will learn more about the implementation of submit() and FutureTask.

Usage scenarios & Examples

Usage scenarios

The application scenario I can think of is that in parallel computing, for example, methodA() and methodB() are called in A method. We can asynchronously submit methods A and B through the thread pool, and then obtain the result of assembling methods A and B in the main thread, which can greatly improve the throughput of the method.

Use the sample

/** * @author wangmeng * @date 2020/5/28 15:30 */ public class FutureTaskTest { public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService threadPool = Executors.newCachedThreadPool(); System.out.println("==== executes FutureTask thread task ===="); Future<String> futureTask = threadPool.submit(new Callable<String>() { @Override public String call() throws Exception { System.out.println("FutureTask executes business logic "); Thread.sleep(2000); System.out.println("FutureTask business logic completed!" ); Return "Welcome attention: a flower is not romantic!" ; }}); System.out.println("==== executes the main thread task ===="); Thread.sleep(1000); boolean flag = true; while(flag){ if(futureTask.isDone() && ! Futuretask.iscancelled ()){system.out.println (" futureTask execution result: "+ futureTask.get()); flag = false; } } threadPool.shutdown(); }}Copy the code

The above use is very simple, submit() is actually a Callable interface, we implement the call() method, we can use futureTask to get the specific return value.

Submit () implementation principle

Submit () is also used to submit the task to the thread pool, but it can retrieve the result of the task, which is implemented by FutureTask.

public class ThreadPoolExecutor extends AbstractExecutorService { public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } } public abstract class AbstractExecutorService implements ExecutorService { protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); }}Copy the code

The task is still executed using the execute() method, but the task is wrapped as a FutureTask, which is executed when the thread is started in excute().

Let’s take a look at the complete link-diagram it executes:

As can be seen from the figure above, FutureTask is the core logic for executing tasks and returning execution results. We take futureTask.run /get as the breakthrough point to analyze the implementation principle of FutureTask bit by bit.

FutureTask source exploration

Let’s take a look at some of the attributes in FutureTask:

public class FutureTask<V> implements RunnableFuture<V> {
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

    private Callable<V> callable;
    private Object outcome;
    private volatile Thread runner;
    private volatile WaitNode waiters;
}Copy the code
  1. state

    The current task status has seven types.

(实 习) COMPLETING: the current task is COMPLETING (实 习) COMPLETING: the current task is COMPLETING (实 习), not completely completed (实 习), a critical state NORMAL: the current task is COMPLETING (实 习) properly (实 习), EXCEPTIONAL: Exceptions have occurred during the execution of the current task. CANCELLED: Current task CANCELLED: INTERRUPTING: The current task is interrupted.. INTERRUPTED: The current task has been INTERRUPTED

  1. callble

    Users submit Callable for task transfer, customize call method, and realize business logic

  2. outcome

    At the end of a task, outcome saves execution results or exception information.

  3. runner

    Holds a thread object reference to the current task while the current task is being executed by the thread

  4. waiters

    Because many threads get the results of the current task, a stack data structure is used to store them

Futuretask.run () implementation principle

We already know that the thread pool runWorker() will eventually call futureTask.run (), so let’s look at how it works:

The specific code is as follows:

public class FutureTask<V> implements RunnableFuture<V> { public void run() { if (state ! = NEW || ! UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c ! = null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { runner = null; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); }}}Copy the code

The first step is to determine the state in FutureTask, which must be NEW to continue execution.

The Runner reference is then modified to the current thread via CAS.

The user-defined call() method is then executed to set the return result to result, which may be normal or an exception. Here we’re basically calling set()/setException()

Futuretask.set () implementation principle

The set() method is easy to implement, just look at the code:

public class FutureTask<V> implements RunnableFuture<V> { protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); finishCompletion(); }}}Copy the code

Assign the data returned by call() to the global outcome variable, change the state to NORMAL, and finally call finishCompletion() to wake up the suspended thread, which will be covered later on after get().

Futuretask.get () implementation principle

Now look at the code:

public class FutureTask<V> implements RunnableFuture<V> { public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); }}Copy the code

FutureTask state (NORMAL) or COMPLETING FutureTask (实 习), the task has not been completed and the call to get() will be blocked by awaitDone() :

private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { if (q ! = null) q.thread = null; return s; } else if (s == COMPLETING) Thread.yield(); else if (q == null) q = new WaitNode(); else if (! queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); }}Copy the code

This method is arguably the most core method in FutureTask. Step by step:

If timed is not empty, the specified nanOS time has not returned a result, and the thread exits.

Q is a WaitNode object that encapsulates the current reference thread in a stack data structure. The WaitNode object has the following properties:

static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); }}Copy the code

It then determines whether the current thread is interrupted and throws an interrupt exception if it is interrupted.

Let’s go to rounds of if… else if… To judge the logic, we still use the branch way to analyze.

实 四 : If (S > COMPLETING) {

At this point, the get() method already has a result, whether it is a normal result, or an exception, interrupt, cancel, etc., it returns state directly and executes the report() method.

实 二 : Else if (S == 实)

If the condition is true, it indicates that the current task is close to completion. Let the current thread release the CPU again for the next round of CPU preemption.

Else if (q == null)

For the first spin execution, WaitNode has not yet been initialized. Q =new WaitNode();

Else if (! queued){

Queued represents whether the current thread is pushed or not, and if it is not, it is pushed, as well as pointing the global variable at the top of the stack.

Branch 5/6: locksupport. park

If timeout is set, use parkNanos to suspend the current thread, otherwise use park()

After such a spin loop, if call() does not return a result, all threads calling get() are suspended.

The suspended threads will wake up in turn, waiting for run() to return the results, as detailed in finishCompletion().

The data in the final stack structure is as follows:

FutureTask. FinishCompletion () implementation principle

The specific implementation code is as follows:

private void finishCompletion() { for (WaitNode q; (q = waiters) ! = null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t ! = null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; q = next; } break; } } done(); callable = null; }Copy the code

The code implementation is simple. After looking at the get() method, we know that all threads calling the get() method are stored in a Statck data structure with a WaitNode until run() returns a result, and each thread is suspended.

This works by traversing the top element of the waiters stack and waking up the next node in turn, which then invokes the Report () method.

Futuretask.report () implementation principle

The specific code is as follows:

private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}Copy the code

The outcome data must be better than those in COMPLETING (实 习). The outcome data must be better than those in COMPLETING (实 习).

If state is cancelled, a CancellationException is thrown.

If none of the states are met, an execution error occurs and an ExecutionException is thrown

Futuretask.cancel () implementation principle

public boolean cancel(boolean mayInterruptIfRunning) { if (! (state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { if (mayInterruptIfRunning) { try { Thread t = runner; if (t ! = null) t.interrupt(); } finally { UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { finishCompletion(); } return true; }Copy the code

The logic of the cancel() method is simple: change the state to CANCELLED, and then call finishCompletion() to wake up the waiting thread.

MayInterruptIfRunning interrupts the current thread before waking up the waiting thread.

conclusion

The implementation principle of FutureTask is actually very simple, each method basically draws a simple flow diagram for immediate convenience.

I’m also going to share an interpretation of the source code for BlockingQueue, so that thread pooling can be completed.

Maybe we’ll share one before thatSpringCloudCommon configuration code analysis, best practices and other manuals, easy to use in the work, but also a summary of the source code read before. Stay tuned! Welcome to: