Introduction to the

With the rapid development of mobile Internet, the business architecture has also become complex and more and more business systems. Typically, we call these interfaces asynchronously. As more and more high-concurrency systems become available, the asynchronous callback pattern becomes increasingly important.

The question is, how do you get the result of processing an asynchronous call? Let’s talk about it

Asynchronous callback for Java Futures

Callable interface

Before we talk about the Callable interface, let’s mention the Runnable interface. The Runnable interface is an abstract interface that represents the business code of a thread in Java multithreading. But Runnable does not return a value. To solve this problem, Java defines an interface similar to Runnable called the Callable interface. And call the business processing method call

@FunctionalInterface
public interface Callable<V> {
    V call(a) throws Exception;
}
Copy the code

The Callable interface is a generic interface, also declared as a functional interface. The only abstract method, call, has a return value of the actual type of the generic parameter

A preliminary FutureTask class

As the name implies, the FutureTask class represents a task to be executed in the future, representing operations to be performed by a new thread. It is also in the java.util.concurrent package. The source code is as follows:

public class FutureTask<V> implements RunnableFuture<V> {
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable}}Copy the code

The FutureTask class acts as a bridge between Callable instances and Thread instances. FutureTask encapsulates a Callable instance internally and then acts itself as a target for the Thread Thread.

The Future interface

The Future interface is not complex, and is mainly the execution of concurrent tasks and operations to obtain their results. It has three main functions.

  • Check whether the concurrent task is complete.
  • Gets the result of the completion of concurrent tasks
  • Cancels concurrent tasks
package java.util.concurrent;
public interface Future<V> {
  boolean cancel(boolean mayInterruptIfRunning);
  boolean isCancelled(a);
  boolean isDone(a);
  V get(a) throws InterruptedException, ExecutionException;
  V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
Copy the code

Details are as follows:

  • V get() : obtains the execution results of concurrent tasks. This method blocks, and the thread that called it will block until the concurrent task completes
  • V GET (Long timeout, TimeUtil Unit) : obtains the execution result of concurrent tasks. Also blocks, but there is a time limit on the block, and if the block time exceeds the set time, the method will throw an exception
  • Boolean isDone() : Gets whether the status of concurrent tasks has ended
  • Boolean isCancelled() : Gets the cancelled status of concurrent tasks. Returns true if the task was canceled
  • Boolean Cancel (Boolean mayInterruptIfRunning) : Cancels the execution of concurrent tasks

Revisited FutureTask class

In the FutureTask class, there is a private member of a Callable, and FutureTask has a run method inside. The Run method is an abstract method of the Runable interface that provides its own implementation inside the FutureTask class. When the Thread instance is executed, the run method is executed asynchronously as a target. The run method inside FutureTask is actually the call method that executes the Callable.

After execution, the results are saved in the private member — outcome property

private Object outcome; // non-volatile, protected by state reads/writes
Copy the code

Outcome is responsible for preserving results. FutureTask then retrieves the object value using the GET method, and the FutureTask completes successfully.

public void run(a) {
  if(state ! = NEW || ! UNSAFE.compareAndSwapObject(this, runnerOffset,
                                   null, Thread.currentThread()))
    return;
  try {
    Callable<V> c = callable;
    if(c ! =null && state == NEW) {
      V result;
      boolean ran;
      try {
        result = c.call();
        ran = true;
      } catch (Throwable ex) {
        result = null;
        ran = false;
        setException(ex);
      }
      if (ran)
        // Save result to outcomeset(result); }}finally {
    // runner must be non-null until state is settled to
    // prevent concurrent calls to run()
    runner = null;
    // state must be re-read after nulling runner to prevent
    // leaked interrupts
    int s = state;
    if(s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); }}Copy the code

The instance

package com.zou;

import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;


public class futureTask {
    public static void main(String[] args) throws Exception {
        FutureTask<Boolean> TaskA = new FutureTask<>(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
                System.out.println("Mission A is ready 👌");
            } catch (Exception e) {
                System.out.println("Mission A went wrong.");
                return false;
            }
            System.out.println("Task A completed");
            return true;
        });

        FutureTask<Boolean> TaskB = new FutureTask<>(() -> {
            try {
                TimeUnit.SECONDS.sleep(4);
                System.out.println("Mission B is ready 👌");
            } catch (Exception e) {
                System.out.println("There's a problem with mission B.");
                return false;
            }
            System.out.println("Task B completed");
            return true;
        });

        Thread threadA = new Thread(TaskA);
        Thread threadB = new Thread(TaskB);

        threadA.start();
        threadB.start();

        Thread.currentThread().setName("Main thread");
        try {
            boolean a = TaskA.get();
            boolean b = TaskB.get();
            isReady(a, b);
        } catch (Exception e) {
            System.out.println("There has been an interruption.");
        }
        System.out.println("End of run");
    }
    public static void isReady(boolean a, boolean b) {
        if (a && b) {
            System.out.println("It's all set.");
        } else if(! a) { System.out.println("A is not ready.");
        } else {
            System.out.println("B is not ready."); }}}Copy the code

The running results are as follows:

A Task is ready 👌 A task is finished B Task is ready 👌 B Task is finished All tasks are readyCopy the code

If you run through the code above, you’ll see that the FutureTask get method blocks while it asynchronously retrieves the result. So it can be classified as an asynchronous blocking pattern.

Asynchronous blocking is often inefficient; the blocked main thread cannot do anything. There is no non-blocking asynchronous result retrieval method implemented. If you need to get asynchronous results, you need to introduce some frameworks, starting with Google’s Guava framework

Guava’s asynchronous callback

Guava is a Google Java extension that provides a solution for asynchronous callbacks. Related to the source at com.google.com mon. Util. Concurrent package. Many of the classes in the package are extensions and enhancements to the capabilities of java.util.Concurrent. Guava’s asynchronous task interface ListenableFuture, for example, enables non-blocking access to asynchronous results.

For asynchronous callbacks, Guava mainly makes the following enhancements:

  • Introduce a new interfaceListenableFuture, inheriting Java’s Future interface, enables Java’s Future asynchronous tasks to be monitored in Guava and obtain the results of non-blocking asynchronous execution.
  • Introduce a new interfaceFutureCallbackThis is aSeparate new interface. The purpose of this interface is that after an asynchronous task is executed,Depending on the asynchronous result, different callback processing is done, and the callback result can be processed.

Rounding FutureCallback

FutureCallback is a new interface used to fill in the listening logic after an asynchronous task is executed. There are two callback methods:

  • OnSuccess method, which is called back after the asynchronous task is successfully executed; When called, the execution result of the asynchronous task is passed in as an argument to the onSuccess method
  • OnFailure method, which is called back when an exception is thrown during the execution of an asynchronous task An exception thrown by the asynchronous task when called, passed in as an argument to the onFailure method.

FutureCallback source code is as follows:

public interface FutureCallback<V> {
  /** Invoked with the result of the {@code Future} computation when it is successful. */
  void onSuccess(@Nullable V result);

  /**
   * Invoked when a {@code Future} computation fails or is canceled.
   *
   * <p>If the future's {@link Future#get() get} method throws an {@link ExecutionException}, then
   * the cause is passed to this method. Any other thrown object is passed unaltered.
   */
  void onFailure(Throwable t);
}
Copy the code

Note that Guava’s FutureCallable and Java’s Callable are similar in name but different in substance.

  1. Java’s Callable interface represents a piece of execution logic
  2. Guava’s FutureCallback interface represents the cleanup after the Callable asynchronous logic is executed, depending on whether it succeeds or fails

So how does Guava implement the monitoring relationship between asynchronous task Callable and FutureCallable result callback? Guava introduced a new interface, ListenableFuture, which inherits Java’s Future interface to enhance monitoring capabilities.

Rounding ListenableFuture

Guava’s ListenableFuture interface is an extension of Java’s Future interface and can be understood as an instance of an asynchronous task. Source code is as follows:

public interface ListenableFuture<V> extends Future<V> {
  void addListener(Runnable listener, Executor executor);
}
Copy the code

ListenableFuture adds only one method, the addListener method. Its role is to encapsulate the FutureCallback cleanup callback from the previous section into an internal Runnable asynchronous callback task that will be called back to FutureCallback for cleanup after the Callable asynchronous task completes

In real programming, how do you bind the FutureCallback callback logic to an asynchronous ListenableFuture task? You can use Guaba’s Futures utility class, which has an addCallback static method that binds the callback instance of FutureCallback to the ListenableFuture asynchronous task. Bindings like this:

Futures.addCallback(ListenableFuture, new FutureCallback<Object>() {

  @Override
  public void onSuccess(@Nullable Object result) {}@Override
  public void onFailure(Throwable t) {

  }
}, executors);
Copy the code

Meanwhile, the question is, Guava is an extension of Future asynchronous tasks, but where did Guava’s asynchronous tasks come from?

Get the ListenableFuture asynchronous task

To obtain an instance of Guava’s ListenableFuture asynchronous task, you do so by submitting a Callable task to the thread pool. However, the thread pool in question is not Java’s thread pool, but Guava’s own thread pool.

A Guava thread pool is a decoration for a Java thread pool. It is created as follows:

ExecutorService jPool = Executors.newFixedThreadPool(10);
ListeningExecutorService Pool = MoreExecutors.listeningDecorator(jPool);
Copy the code

Create a Java thread pool and pass it in as a parameter to the Guava thread pool to get the Guava thread pool. Then submit the task via subimT to get the ListenableFuture asynchronous task instance

ListenableFuture<Boolean> task = Pool.submit(() -> {
    return true;
});

Futures.addCallback(task, new FutureCallback<Boolean>() {
    @Override
    public void onSuccess(@Nullable Boolean result) {}@Override
    public void onFailure(Throwable t) {

    }
}, jPool);
Copy the code

The instance

package com.zou;

import com.google.common.util.concurrent.*;
import org.checkerframework.checker.nullness.qual.Nullable;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class guava {
    public static void main(String[] args) throws Exception {
      	// Create Guava thread pool
        ExecutorService jPool = Executors.newFixedThreadPool(10);
        ListeningExecutorService Pool = MoreExecutors.listeningDecorator(jPool);

      	// Get the ListenableFuture asynchronous task
        ListenableFuture<Boolean> task = Pool.submit(() -> {
            TimeUnit.SECONDS.sleep(2);
            System.out.println("Sub-thread starts execution");
            return true;
        });
				
      	/ / callback
        Futures.addCallback(task, new FutureCallback<Boolean>() {
            @Override
            public void onSuccess(@Nullable Boolean result) {
                System.out.println("Executed successfully");
            }

            @Override
            public void onFailure(Throwable t) {
                System.out.println("Execution failed");
                t.printStackTrace();
            }
        }, jPool);

        TimeUnit.SECONDS.sleep(1);
        System.out.println("Main thread execution completed"); }}Copy the code

The result is:

The main thread has completed. The secondary thread has started executing successfullyCopy the code

You can see that the program is already asynchronous and non-blocking.

The essential difference between Guava asynchronous callbacks and Java’s FutureTask asynchronous callbacks is

  • Guava is a non-blocking asynchronous callback, and the calling thread is non-blocking and can continue to execute its business logic
  • FutureTask is a blocking asynchronous callback, and the calling thread is blocked, waiting for the asynchronous thread to return the result while it gets the asynchronous result

Netty’s asynchronous callback

The Netty official documentation indicates that all Netty network operations are asynchronous. In Netty source code, the asynchronous callback processing mode is widely used. At the service development level of Netty, the service processing codes in the Handler processor of Netty applications are also executed asynchronously. Therefore, it is necessary and important to understand Netty’s asynchronous callbacks.

Similarly, Netty inherits and extends the API of JDK Future series asynchronous callback, defines its own Future series interface and class, realizes the monitoring of asynchronous tasks, and obtains the results of asynchronous execution.

In general, Netty extends JavaFuture asynchronous tasks as follows:

  • Inherit Java’s Future interface and get a new Netty asynchronous task interface. This interface enhances the original interface so that Netty asynchronous tasks can handle callback results in a non-blocking manner. Netty did not change the name of the Future, just the name of the package in which it resides.
  • A new interface, GenericFutureListener, was introduced to represent asynchronously executed listeners. Netty uses the Listener mode, where the callback logic after the execution of an asynchronous task is abstracted into a Listener interface. Netty’s GenericFutureListener interface can be added to Netty’s asynchronous task Future to monitor the execution status of asynchronous tasks.

In general, the design idea is similar to Guava. The corresponding relationship is as follows:

  • Netty’s Future interface can correspond to Guava’s ListenableFuture interface.
  • Netty’s GenericFutureListener interface can correspond to Guava’s FutureCallback interface.

GenericFutureListener interface

As mentioned earlier, like Guava’s FutureCallback, Netty has added an interface to encapsulate the logic of asynchronous non-blocking callbacks —– : The GenericFutureListener interface.

GenericFutureListener is stored in io.netty.util.concurrent.

public interface GenericFutureListener<F extends Future<? >>extends EventListener {

    /**
     * Invoked when the operation associated with the {@link Future} has been completed.
     *
     * @param future  the source {@link Future} which called this callback
     */
    void operationComplete(F future) throws Exception;
}
Copy the code

GenericFutureListener has a callback method: operationComplete, indicating that the asynchronous task operation is complete. This method is called back after the Future asynchronous task completes execution.

The EventListener here is an empty interface, without any abstract methods, and is just a presentation interface.

Netty’s Future interface

Netty’s Future interface extends a series of methods to monitor execution and listen for asynchronous callback completion events. Netty Future interface source code is as follows:

public interface Future<V> extends java.util.concurrent.Future<V> {
  // Add a listener for asynchronous task completion
	Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
  // Remove the listener for asynchronous task completion
  Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener); . }Copy the code

The Future interface of Netty is not used directly. Instead, subinterfaces are used. Netty has a series of subinterfaces that represent different types of asynchronous tasks, such as the ChannelFuture interface.

The ChannelFuture subinterface is an asynchronous task for channel I/O operations. Use ChannelFuture if a callback operation needs to be performed after the asynchronous I/O operation on the channel has completed.

The use of ChannelFuture

In Netty network programming, the input and output of a network connection channel are handled asynchronously, returning an instance of the ChannelFuture interface. An asynchronous callback listener can be added to the returned asynchronous task instance. The callback is executed after the asynchronous task actually ends.

Netty async callback for a Netty connection

Bootstrap bootstrap = new Bootstrap();
ChannelFuture connect = bootstrap.connect("localhost".6666);

connect.addListener(new ChannelFutureListener() {
  @Override
  public void operationComplete(ChannelFuture future) throws Exception {
    if (future.isSuccess()) {
      / / success
      System.out.println("yes");
    } else {
      / / fail
      System.out.println("exception"); future.cause().printStackTrace(); }}});Copy the code

The GenericFutureListener interface is a basic type interface in Netty. In asynchronous callbacks for network programming, one of the subinterfaces provided in Netty, such as the ChannelFutureListener interface, is typically used.

conclusion

Well, that’s pretty much the end of the asynchronous callback. As more and more high-concurrency systems become available, the asynchronous callback pattern becomes increasingly important. Let’s recall the asynchronous callbacks we talked about

Java’s native asynchronous callback:

  • The Future is the interface, and the corresponding GET method in FutureTask is the callback to the result. But this is blocked asynchronously

Guava asynchronous callback:

  • ListenableFuture acts as the interface and the corresponding FutureCallback does the asynchronous callback to the result. Asynchronous nonblocking

Netty asynchronous callback:

  • The Future acts as an interface (and a different package with the same name that comes with Java), and the corresponding GenericFutureListener does the asynchronous callback to the result. Asynchronous nonblocking