While studying Netty recently, I found that futuresare everywhere in the code. I was curious about how some of the functions it provides are implemented: for example, various Callback. I guess it has to do with some design pattern, such as the Observer pattern, but I’d love to know how it works.

In addition to ChannelFuture in Netty, we often encounter a variety of Future in other places, such as: FutureTask in JDK, ListenableFuture in Guava, FutureCallback in Guava, ListenableFutureTask in Spring, etc., all provide rich apis. More content with the various scenarios we encountered when programming.

In this article, I will mainly introduce Future from the following two aspects: 1. Why can the Future method get the execution results of asynchronous tasks? 2. How does Future handle callbacks?

Get asynchronous results

Before we cover the above three things, let’s discuss a problem: how do I get the results of asynchronous threads?

Without those apis, how would you get the results of asynchronous threads? In different situations, the way is different

The global variable

private volatile static Integer result;

public static void main(String[] args) throws InterruptedException {
    new Thread(() -> {
        System.out.println("Processing business logic");
        result = 1000;
    }).start();
    Thread.sleep(1000);
    System.out.println(result);
}

while(result == null){
    sleep(100)}Copy the code
  1. Assigns the execution result of an asynchronous thread to a global variable
  2. The current thread selects a different way to fetch depending on the scenario: for examplePolling + blocking; orGet nothing at allI don’t care when the task is completed. All I need is the cached value. I get the latest value when the task is executed, and I get the old value when the task is not executed

Simple packaging

Maybe we can encapsulate it? Before encapsulation, consider a few questions:

  1. Where is the logical definition of the task? If you use Runnable, you can’t return a value, so you can define an @functionalInterface that returns a value calledTask
  2. Where do I store the returned values? How to return? Thread does not have methods associated with it.
public static void main(String[] args) throws InterruptedException {
    CallableThread callableThread = new CallableThread(() -> {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "ccccc";
    });

    callableThread.start();
    System.out.println("Start time" + LocalDateTime.now());
    System.out.println(callableThread.get());
    System.out.println("End time" + LocalDateTime.now());
}


class CallableThread<T> extends Thread {
    private Task<T> task;
    private T result;
    private volatile boolean finished = false;

    public CallableThread(Task<T> task) {
        this.task = task;
    }

    @Override
    public void run(a) {
        result = task.call();
        finished = true;
        notifyAll();
    }

    public T get(a) {
        while(! finished) { wait(); }returnresult; }}@FunctionalInterface
interface Task<T> {
    T call(a);
}
Copy the code

This seems to work, but not so well:

  1. Thread is only used to process Thread related things. Now it is bound to logical (Task). If multiple tasks want to share the same Thread, what will happen to the return value?
  2. Can you pull that logic out and put it in a new class?

Simple package optimization

public static void main(String[] args) throws InterruptedException {
    MyRunnable<String> myRunnable = new MyRunnable(() -> {
        // Simulate time-consuming business operations
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "I am the result";
    });
    
    System.out.println("Start time" + LocalDateTime.now());
    new Thread(myRunnable).start();
    System.out.println("result: " + myRunnable.get());
    System.out.println("End time" + LocalDateTime.now());
}


class MyRunnable<T> implements Runnable {
    private Task<T> task;

    private T result;

    private volatile boolean finished = false;

    public MyRunnable(Task<T> task) {
        this.task = task;
    }

    @Override
    public void run(a) {
        result = task.call();
        finished = true;
        notifyAll();
    }

    public T get(a) {
        while(! finished) { wait(); }returnresult; }}Copy the code

Isn’t this a bit like the Future in Java?

Future

Let’s take a look at Future in Java

The basic use

public static void main(String[] args) throws ExecutionException, InterruptedException {
    FutureTask<String> future = new FutureTask<>(() -> {
        Thread.sleep(3000);
        System.out.println(System.currentTimeMillis());
        return "hehehh";
    });
    new Thread(future).start();
    System.out.println("Start Get Result : " + System.currentTimeMillis());
    System.out.println("Get Result : " + future.get() + System.currentTimeMillis());
}
Copy the code

The principle of analysis

There are several core concepts in the Future pattern:

  1. Future: abstract outGets the task return value,Example Obtain the task execution statusAnd other common methods of interface
  2. Callable: The carrier of a task, similar to the one aboveTask
  3. FutureTask: Similar to the aboveMyRunnableIt’s implemented simultaneouslyRunnableandFutureinterface

The Future interface

boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
Copy the code

FutureTask

FutureTask implements both Runnable and Future interfaces, so in addition to being a carrier for tasks, it also provides an API for manipulating tasks: getting execution results, canceling tasks, determining whether a task has completed, and so on

Task status

Task status refers to the status of a task during running. With the execution of a task, the status changes constantly. In FutureTask, the different states of the task are represented by the state variable. There are several states:

/*
 * NEW -> COMPLETING -> NORMAL
 * NEW -> COMPLETING -> EXCEPTIONAL
 * NEW -> CANCELLED
 * NEW -> INTERRUPTING -> INTERRUPTED
 */

private volatile int state;

private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;
Copy the code

Task execution

Since FutureTask implements the Runnable interface, it focuses on its run method: Callable#call (); call (); call (); call (); call ();

  1. Executed successfully: Set state toCOMPLETINGAnd then save the returned result to the propertyoutcomeAnd set state toNORMAL, finally passedLockSupport.unpark(t)Unblocking a thread
  2. Execution failed: first set state toCOMPLETINGThe exception information is then saved to the propertyoutcomeAnd set state toEXCEPTIONAL, finally passedLockSupport.unpark(t)Unblocking a thread
public void run(a) {
    if(state ! = NEW ||! UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if(c ! =null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if(ran) set(result); }}}protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final statefinishCompletion(); }}protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final statefinishCompletion(); }}Copy the code

To get the results

We can get the results of an asynchronous task using the FutureTask#get method, but how? When we execute the FutureTask#get method

  1. If the asynchronous task has already been executed, we can return the result directly
  2. If the asynchronous task is not completed, the current thread is blocked and will not wake up until the task is completed
public V get(a) throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false.0L);
    return report(s);
}

private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        // State > COMPLETING, the task must either be completed normally or abnormally
        if (s > COMPLETING) {
            if(q ! =null)
                q.thread = null;   // This is help GC.
            return s;
        }
        // If you are in the closing phase, hand over the CPU and wait for the next loop
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        else if (q == null)
            q = new WaitNode();
        // Configure the waiters via UNSAFE
        else if(! queued)// Add a new 'WaitNode' to the head of the waiters, as the corresponding head node
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
        // If there is a maximum waiting time
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else
            LockSupport.park(this);   // Block the current thread}}Copy the code

Imagine a scenario where the FutureTask object is already exposed, which means it can’t be used by multiple threads. If multiple threads call the FutureTask#get method before the asynchronous task is completed, how do you block those threads?

  1. For different threads, wrapped into differentWaitNodeObject, and then put theseWaitNodeObjects are assembled into a one-way linked list. New nodes are added to the header
  2. throughLockSupport.parkBlocking the current thread
  3. When a task is completed, it is executedfinishCompletionMethod, mainly is from the beginning of the node in turn down traversal, to obtain the nodethreadProperty, and then executeLockSupport.unpark(thread)Method to wake up a blocked thread

ListenableFuture

The callback

The FutureTask#get method can get the return value of an asynchronous task, but what if I want to do some other logic after I get the return value? In fact, my most immediate idea is a callback. For example, we could extend the MyRunnable code above, for example

Public MyRunnable addListener(Consumer C) {public MyRunnable addListener(Consumer C) {public MyRunnable addListener(Consumer C) { finished) { Thread.sleep(10); } c.accept(result); }).start(); return this; }Copy the code

We add an addListener method to MyRunnable that accepts a Consumer as an input and executes this logic when the task is complete, as follows:

public static void main(String[] args) throws InterruptedException { MyRunnable<String> myRunnable = new MyRunnable(() Try {thread.sleep (3000); } catch (InterruptedException e) { e.printStackTrace(); } return "I am the result "; }); System.out.println(" start time "+ localDatetime.now ()); new Thread(myRunnable).start(); Myrunnable.addlistener (result -> {system.out.println (" when XXX completes, the thread: "+ thread.currentThread ().getName() +" Perform some other task "); result = result + " ggggg"; System.out.println(result); }); }Copy the code

ListenableFuture#addListener

ListenableFuture is an enhanced version of the Guava package. ListenableFuture inherits the Future and adds a callback method

** * @param listener the listener to run when the computation is complete callback logic * @param executor the executor to run The listener in which the callback is executed */ void addListener(Runnable Listener, Executor Executor);Copy the code

ListenableFutureTask inherits From FutureTask and implements the ListenableFuture interface. See a simple example

public static void main(String[] args) throws InterruptedException { ListenableFutureTask futureTask = ListenableFutureTask. Create (() - > {System. Out. Println (" mission start "+ LocalDateTime. Now ()); Thread.sleep(3000); System.out.println(" Execute task completed "+ localDatetime.now ()); Return C. }); FutureTask. AddListener (() - > System. Out. The println (" to obtain the results after, prints a log "), MoreExecutors. DirectExecutor ()); new Thread(futureTask).start(); }Copy the code

Source code analysis

The idea is to maintain all callbacks in a one-way linked list, i.eExecutionListAnd then rewrite itFutureTask#doneMethod to perform the callback logic after the task is complete

// Each callback is a RunnableExecutorPair node, and all RunnableExecutorPair nodes form a linked list, Private Final ExecutionList ExecutionList = New ExecutionList(); // ListenableFutureTask#addListener public void addListener(Runnable listener, Executor exec) { executionList.add(listener, exec); } // ExecutionList#add public void add(Runnable Runnable, Executor Executor) {// the internal property executed may be updated by the task logic thread, ListenableFutureTask implements the done method of FutureTask and then updates the executed value to true. Also, if not locked, when multiple threads add a callback, Synchronized (this) {// Add the current node to the header if (! executed) { runnables = new RunnableExecutorPair(runnable, executor, runnables); return; ExecuteListener (runnable, executor); executeListener(runnable, executor); } // ExecutionList#executeListener private static void executeListener(Runnable runnable, Executor Executor) {try {// Direct the task to the thread pool executor.execute(runnable); } catch (RuntimeException e) { log.log(Level.SEVERE, "RuntimeException while executing runnable " + runnable + " with executor " + executor, e); } } // ExecutionList.RunnableExecutorPair private static final class RunnableExecutorPair { final Runnable runnable; final Executor executor; @Nullable RunnableExecutorPair next; RunnableExecutorPair(Runnable runnable, Executor executor, RunnableExecutorPair next) { this.runnable = runnable; this.executor = executor; this.next = next; }}Copy the code

How does ListenableFutureTask know if the task is completed? In the FutureTask#finishCompletion method, after unblocking the thread, a done method is executed, but that method has no logic in FutureTask and can be treated as a template method, which ListenableFutureTask implements. As follows:

// ListenableFutureTask#done protected void done() { executionList.execute(); } // ExecutionList#execute public void execute() { RunnableExecutorPair list; synchronized (this) { if (executed) { return; } // Executed = true; // runnables = runnables; runnables = null; // allow GC to free listeners even if this stays around for a while. } RunnableExecutorPair reversedList = null; While (list!); // While (list!); // While (list! = null) { RunnableExecutorPair tmp = list; list = list.next; tmp.next = reversedList; reversedList = tmp; } // Iterate through the list, executing the callback logic in turn while (reversedList! = null) { executeListener(reversedList.runnable, reversedList.executor); reversedList = reversedList.next; }}Copy the code

FutureCallback

With ListenableFutureTask, we can perform some callback logic after the task is completed. If you are careful, you will notice that the callback method does not use the return value of the task. What if I want to get the value first and then use the return value to do the next operation? Or can you only block the current thread with the GET method first? In fact, the Guava package also gives us related interfaces. Let’s start with an example:

public static void main(String[] args) throws InterruptedException { ListenableFutureTask futureTask = ListenableFutureTask. Create (() - > {System. Out. Println (" mission start "+ LocalDateTime. Now ()); Thread.sleep(3000); System.out.println(" Execute task completed "+ localDatetime.now ()); Return C. }); Futures.addCallback(futureTask, New FutureCallback<String>() {@override public void onSuccess(String result) {system.out.println (" execute successfully: "+ result); } @override public void onFailure(Throwable t) {system.out.println (" execute failed "); }}); new Thread(futureTask).start(); }Copy the code

Source code analysis

There are two methods in the FutureCallback interface, which correspond to the task execution success logic and task failure logic respectively

void onSuccess(@Nullable V result);

void onFailure(Throwable t);
Copy the code

Futures can be described as a facade class that encapsulates some operations

// Futures#addCallback public static <V> void addCallback( ListenableFuture<V> future, FutureCallback<? Super V> callback) {// The DirectExecutor thread pool is used here, that is, addCallback(Future, callback, DirectExecutor ()) is executed directly on the current thread; } // Futures#addCallback public static <V> void addCallback(final ListenableFuture<V> future, final FutureCallback<? super V> callback, Executor executor) { Runnable callbackListener = new Runnable() { @Override public void run() { final V value; try { value = getDone(future); } catch (ExecutionException e) { callback.onFailure(e.getCause()); return; } catch (RuntimeException e) { callback.onFailure(e); return; } catch (Error e) { callback.onFailure(e); return; } callback.onSuccess(value); }}; // Finally encapsulate this logic into a callback, then retrieve the return value in this callback, and execute the corresponding FutureCallback method future.addListener(callbackListener, executor) based on the result of the return value; } // Futures#getDone public static <V> V getDone(Future<V> future) throws ExecutionException { checkState(future.isDone(), "Future was expected to be done: %s", future); return getUninterruptibly(future); } public static <V> V getUninterruptibly(Future<V> future) throws ExecutionException { boolean interrupted = false; try { while (true) { try { return future.get(); } catch (InterruptedException e) { interrupted = true; } } } finally { if (interrupted) { Thread.currentThread().interrupt(); }}}Copy the code

It is essentially a callback that encapsulates the logic that retrieves the return value and executes the FutureCallback method based on the result of the return value, but it is much easier to use.

There is a difference between calling the Future#get method directly to get the return value and then executing the rest of the logic, because we block the current thread by calling the Future#get method directly, whereas guava executes the logic in a callback, which is sort of a notification mechanism, so it doesn’t block the current thread.

ListenableFutureTask

Spring also has a ListenableFutureTask, which is similar to Guava in implementation. It also inherits FutureTask and implements its own ListenableFuture interface. By overwriting FutureTask#done, In this method, the return value is obtained and the callback logic is performed

Public static void main(String[] args) {ListenableFutureTask Future = new ListenableFutureTask(() -> "result "); future.addCallback(new ListenableFutureCallback() { @Override public void onSuccess(Object result) { System.out.println("callback " + result); } @override public void onFailure(Throwable ex) {system.out.println (" execute failed "); }}); new Thread(future).start(); }Copy the code

Source code analysis

Its Callback is stored in two queues: successCallbacks and failureCallbacks, and the data structure is LinkedList

private final Queue<SuccessCallback<? super T>> successCallbacks = new LinkedList<SuccessCallback<? super T>>();

private final Queue<FailureCallback> failureCallbacks = new LinkedList<FailureCallback>();
Copy the code

The overridden done method is as follows, but the logic is simple enough not to explain

protected void done() {
    Throwable cause;
    try {
        T result = get();
        this.callbacks.success(result);
        return;
    }catch (InterruptedException ex) {
        Thread.currentThread().interrupt();
        return;
    }catch (ExecutionException ex) {
        cause = ex.getCause();
        if (cause == null) {
            cause = ex;
        }
    }
    catch (Throwable ex) {
        cause = ex;
    }
    this.callbacks.failure(cause);
}
Copy the code

CompletableFuture

CompletableFuture

ForkJoinPoll

The Future of Netty

I feel that my current understanding of Future and Promise in Netty is not very deep, and it is difficult to organize a very smooth language. Wait for yourself to have a deeper understanding of Netty, then add