What is asynchrony

Asynchronous programming typically involves two threads: the task submission thread and the task execution thread. After the task is submitted, the task submission thread does not need to wait for the result to return directly. After completing the task, the task execution thread calls back the task execution thread

Benefits of asynchrony: For time-consuming operations, such as I/O and network connections, you do not need to block and wait for the completion of the operation. You only need to register a callback function.

Two difficulties with asynchronous programming:

  1. How to determine whether a task is completed
  2. How do I get the return value

The asynchronous model provided in the JDK

The Future pattern is used in the JDK for asynchronous programming

The demo:

public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(2); System.out.println("start"); Future<Integer> submit = executorService.submit(new Callable<Integer>() { @Override public Integer call() throws Exception {system.out.println (" Call task execution "); Thread.sleep(500); return 99; }}); Integer value = null; try { value = submit.get(); } catch (Exception e) { e.printStackTrace(); } System.out.println(value); System.out.println("end"); executorService.shutdown(); }Copy the code

Analyze the running of the program

  1. Submitted a task with a return value, and returned a Future object, which returned directly
  2. The main function calls the GET method, which is the result of synchronous blocking waiting for the task to run

Let’s look at how the Future solves two difficult problems (how to determine the completion of the task and how to get the return value).

NewFixedThreadPool returns a thread pool of type ThreadPoolExecutor, derived from AbstractExecutorService. Submit is an AbstractExecutorService method. The sumit method in AbstractExecutorService

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}
Copy the code

In SUMit, the incoming Callable object is wrapped as a FutureTask object and returned, and it is actually executed with execute. The execute method is implemented by ThreadPoolExecutor itself. The result or exception of the final thread execution is put into the Outcome field in FutureTask, and the result of the task execution is obtained in the main program through the futureTask.get () method.

Remember that it is the main thread that calls the GET method, and there is a loop in the get method to determine if the value has completed. When the result has not been completed, the main thread will stay in the loop and block the code that follows.

The Future provides the following functionality: the user thread needs to actively poll the Future thread to see if it has completed the current task. If the Future thread does not poll to see if it has completed the task, it will block until the execution is complete. So from this point of view, the Future is not really asynchronous because it lacks a callback and is at best a synchronous non-blocking mode. In Java8, CompletableFuture has been added to implement the callback function.

Future is meant to simplify asynchronous programming. When an asynchronous call is made, synchronization returns a Future object into which the task-practitioner sets the result, and the task submitter needs to get the value, so it’s a matter of communication between threads, and the Future encapsulates the methods.

Netty’s Future and Promise mechanism

  • Netty’s Future inherits from JDK’s Future. Most importantly, the addListener method is extended to enable asynchrony
  • Promises inherit from futures and add two ways to actively set the result

Future

Netty server Demo

NioEventLoopGroup boss = new NioEventLoopGroup(); NioEventLoopGroup worker = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.group(boss, worker); serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ProcotolFrameDecoder()); ch.pipeline().addLast(new LoggingHandler()); ch.pipeline().addLast(new MessageCodecSharable()); ch.pipeline().addLast(new RpcRequestMessageHandler()); }}); ChannelFuture future = serverbootstrap.bind ().sync(); Future.channel ().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); }Copy the code

I used the Future twice, but both of them used the sync method

Serverbootstrap.bind () returns DefaultChannelPromise, and when sync() continues, the sync method in DefaultChannelPromise’s parent class, DefaultPromise, is actually called

// DefaultPromisel class public Promise<V> sync() throws InterruptedException {await(); rethrowIfFailed(); return this; } public Promise<V> await() throws InterruptedException { if (isDone()) { return this; } if (Thread.interrupted()) { throw new InterruptedException(toString()); } checkDeadLock(); synchronized (this) { while (! isDone()) { incWaiters(); try { wait(); } finally { decWaiters(); } } } return this; }Copy the code

So when sync is executed, the current thread will be locked by the DefaultPromisel implementation. When will it wake up?

Finally returns PendingRegistrationPromise class objects, the class was the grandson of DefaultPromise class. A regFuture is a future that connects to a channel and registers a listener with the channel. This listener is called after the operationComplete event is triggered. In this listener, the state of promise is set, The code to change the state is in doBind0(),Finally, cas is used to change the state in the promise. If the change is successful, notifyAll is called to wake up all threads that are blocked because of the current object lock, thus making the master thread jump out of sync().

(The part where the operationComplete event triggers a call to the Listener is more complicated, read the second reference article for details.)

Promise

demo

public static void main(String[] args) throws ExecutionException, InterruptedException { EventLoop next = new NioEventLoopGroup().next(); // Pass in a thread pool DefaultPromise<Integer> PROMISE = new DefaultPromise<>(next); New Thread(() -> {try {system.out.println (" start counting "); Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } promise.setSuccess(1); }).start(); Integer ans = promise.get(); System.out.println(ans); }Copy the code

Promise can be understood as the difference between a container for results and a Future

  1. Defaultpromises can be created voluntarily, and futures are returned by the Submit submission task
  2. DefaultPromise can write results, either through setSuccess() or setFail()

Focus on setSuccess() and setFailure() for get blocking methods, and use the await() method of DefaultPromise. These two methods assign the value of set to the Result field in the DefaultPromise object via CAS, and if the CAS operation succeeds, all blocking threads are woken up and the listener is called.

private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
        AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
        
     
private boolean setValue0(Object objResult) {
    if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
        RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
        if (checkNotifyWaiters()) {
            notifyListeners();
        }
        return true;
    }
    return false;
}
Copy the code

conclusion

  • The Future in the JDK works with a Callable submission task. Because of the lack of a Listener mechanism, the main thread must poll to see if it is finished or block waiting for a value, so it doesn’t look like asynchronous programming
  • Netty Future inherits Jdk Futures and extends them, most notably by adding a register Listener mechanism
  • Promise is intended to solve the two problems that futures cannot be actively created or written. ,
  • DefaultPromise is used in both Future and Promise. There are two main things to understand: 1, block and wake up await(); 2, register and execute Listener (I don’t understand the second one).

Refer to the article

www.cnblogs.com/rickiyang/p…

Gorden5566.com/post/1066.h…

Cloud.tencent.com/developer/a…