This is the 31st day of my participation in the August Text Challenge.More challenges in August

In general, a Future is used to receive the result returned by an asynchronous task.

The method for the Future interface is as follows:

  boolean cancel(boolean mayInterruptIfRunning);
 boolean isCancelled();
     boolean isDone();
     V get() throws InterruptedException, ExecutionException;
   V get(long timeout, TimeUnit unit)
         throws InterruptedException, ExecutionException, TimeoutException;
Copy the code

If a thread pool is used, the thread submission process is usually through the submit(runnable) method, which is used in the

 AbstractExecutorService
Copy the code

Class, the method is as follows:

public Future<? > submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); }Copy the code

Here we can see that the thread actually runs the same as a normal asynchronous task with no results when submitting to the thread pool, so this is the difference

FutureTask.

Let’s look at FutureTask’s source code:

public class FutureTask<V> implements RunnableFuture<V> { //... } // In fact, this interface only specifies one run method: //public interface RunnableFuture<V> extends Runnable, Future<V> { /** * Sets this Future to the result of its computation * unless it has been cancelled. */ //void run(); / /}Copy the code

Take a look at the internal variables:

// The future also provides a set of thread state values similar to those of thread. private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; Private Callable<V> Callable; The visibility of this variable is the private Object outcome indirectly guaranteed by volatile state; // non-volatile, protected by state reads/ index // private volatile Thread runner; private volatile WaitNode waiters;Copy the code

Review our previous ways of using Future:

  • The task is submitted to the thread pool and the Future object is retrieved
  • Call the future. The get ()

So let’s take a two-step look at how future correspondence works.

1. Submit

Without going into the thread pool, the constructor is called when we commit:

 public FutureTask(Callable<V> callable) {
     if (callable == null)
         throw new NullPointerException();
     this.callable = callable;
     this.state = NEW;       // ensure visibility of callable
 }
Copy the code

Then, based on the thread pool abstraction class above, we can directly plug this task into the thread pool and run it. Based on our experience with thread pools, we can now look at the run method of this:

public void run() { if (state ! = NEW || ! UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c ! = null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); }}Copy the code

Here you can see that the task is actually plugged into the thread pool and executed. The process actually looks like this:

  • Tasks are assembled into FutureTasks
  • FutureTask is dispatched to the thread pool for execution
  • After execution, the result is stored in the futureTask object

2. To obtain

public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { if (q ! = null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (! queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } } private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }Copy the code

Part of the essence of futureTask is in awaitDone.

  • A wait queue (waitNode) is maintained and returns the current processing status
  • CAS is used to ensure atomicity of state values.