This is the 22nd day of my participation in the August Wen Challenge.More challenges in August

preface

Gossip: recently the seven dragon ball, dragon ball super look again, or the taste of childhood, really good-looking ~~ really sweet

Last time we looked at Netty’s Future, here’s the link:

Netty source code analysis series (16) Future source analysis

In this article I’m going to take a look at writable futures, or promises, which in Netty extend from Netty’s Future.

Promise interface

In Netty, the Promise interface is a special kind of writable Future. The core source code for Promise is as follows:

public interface Promise<V> extends Future<V> {
    Promise<V> setSuccess(V var1);

    boolean trySuccess(V var1);

    Promise<V> setFailure(Throwable var1);

    boolean tryFailure(Throwable var1);

    boolean setUncancellable(a);

    Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> var1);

    Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... var1);

    Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> var1);

    Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... var1);

    Promise<V> await(a) throws InterruptedException;

    Promise<V> awaitUninterruptibly(a);

    Promise<V> sync(a) throws InterruptedException;

    Promise<V> syncUninterruptibly(a);
}
Copy the code

A Promise is a writable Future. In the Future mechanism, the state of execution (success or failure) of the task in which the business logic resides is implemented in the Future; In promises, the execution results of tasks can be controlled in the business logic, which is more flexible than a Future.

Here is an example (pseudocode) of a Promise.

// Asynchronous time-consuming tasks receive a Promise
public Promise asynchronousFunction(a) {

    Promise promise = new PromiseImpl();

    Object result = null;

    return =search()  // Business logic

        if (sucess) {
            promise.setSuccess(result); // Inform PROMISE that the current asynchronous task succeeded and pass in the result
        } else if (failed) {
            promise.setFailure(reason);// Inform the promise that the current asynchronous task failed
        } else if (error) {
            promise.setFailure(error);// Inform the Promise that the current asynchronous task is abnormal}}// Invoke asynchronous time-consuming operations
Promise promise = asynchronousFunction(promise);// Will immediately return promise

// Add events such as success processing, failure processing, and asynchronous processing
promise.addListener();For example, you can add a successful execution event

// Continue to do other events, regardless of when asynchronousFunction ends
doOtherThings();
Copy the code

In Netty, promises inherit from the Future and therefore have all the functionality of the Future. In the Promise mechanism, the success and failure of the business logic can be manually set in the business logic.

Common Promise classes in Netty include the DefaultPromise class, which is the basis for Promise implementation. DefaultChannelPromise is a subclass of DefaultPromise, adding the channel property.

The Netty DefaultPromise

Promises are used wherever asynchronous operations are involved in Netty. For example, here is the registration task for server/client startup, which ends with a call to the Unsafe Register, passing in a Promise. Unsafe calls Promise when registering an event to set success or failure.

//SingleThreadEventLoop.java
public ChannelFuture register(ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    promise.channel().unsafe().register(this, promise);
    return promise;
}

//AbstractChannel.AbstractUnsafe
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    ObjectUtil.checkNotNull(eventLoop, "eventLoop");
    if (AbstractChannel.this.isRegistered()) {
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
    } else if(! AbstractChannel.this.isCompatible(eventLoop)) {
        promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
    } else {
        AbstractChannel.this.eventLoop = eventLoop;
        if (eventLoop.inEventLoop()) {
            this.register0(promise);
        } else {
            try {
                eventLoop.execute(new Runnable() {
                    public void run(a) {
                        AbstractUnsafe.this.register0(promise); }}); }catch (Throwable var4) {
                AbstractChannel.logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, var4);
                this.closeForcibly();
                AbstractChannel.this.closeFuture.setClosed();
                this.safeSetFailure(promise, var4); }}}}Copy the code

The functionality provided by DefaultPromise can be broken down into two parts; One is to provide callers with get() and addListen() to get Future task execution results and add listening events; The other part is to provide methods such as setSucess() for business processing tasks to set the success or failure of the task.

1. Set the success or failure of the task

DefaultPromise core source code is as follows:

public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {

    public Promise<V> setSuccess(V result) {
        if (this.setSuccess0(result)) {
            return this;
        } else {
            throw new IllegalStateException("complete already: " + this); }}public boolean trySuccess(V result) {
        return this.setSuccess0(result);
    }

    public Promise<V> setFailure(Throwable cause) {
        if (this.setFailure0(cause)) {
            return this;
        } else {
            throw new IllegalStateException("complete already: " + this, cause); }}public boolean tryFailure(Throwable cause) {
        return this.setFailure0(cause);
    }

    public boolean setUncancellable(a) {
        if (RESULT_UPDATER.compareAndSet(this, (Object)null, UNCANCELLABLE)) {
            return true;
        } else {
            Object result = this.result;
            return! isDone0(result) || ! isCancelled0(result); }}public boolean isSuccess(a) {
        Object result = this.result;
        returnresult ! =null&& result ! = UNCANCELLABLE && ! (resultinstanceof DefaultPromise.CauseHolder);
    }

    public boolean isCancellable(a) {
        return this.result == null;
    }

    / /...

}
Copy the code

2. Obtain the execution result of the Future task and add the listening event

DefaultPromise has three get methods.

  • No parametersgetBlocks waiting;
  • A parametergetWaits for the specified event and throws a timeout exception if it does not endgetIt’s in its parent classAbstractFutureIn the implementation of.getNow()Methods return results immediately.

The source code is as follows:

public V getNow(a) {
    Object result = this.result;
    return! (resultinstanceofDefaultPromise.CauseHolder) && result ! = SUCCESS && result ! = UNCANCELLABLE ? result :null;
}

public V get(a) throws InterruptedException, ExecutionException {
    Object result = this.result;
    if(! isDone0(result)) {this.await();
        result = this.result;
    }

    if(result ! = SUCCESS && result ! = UNCANCELLABLE) { Throwable cause =this.cause0(result);
        if (cause == null) {
            return result;
        } else if (cause instanceof CancellationException) {
            throw (CancellationException)cause;
        } else {
            throw newExecutionException(cause); }}else {
        return null; }}public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
    Object result = this.result;
    if(! isDone0(result)) {if (!this.await(timeout, unit)) {
            throw new TimeoutException();
        }

        result = this.result;
    }

    if(result ! = SUCCESS && result ! = UNCANCELLABLE) { Throwable cause =this.cause0(result);
        if (cause == null) {
            return result;
        } else if (cause instanceof CancellationException) {
            throw (CancellationException)cause;
        } else {
            throw newExecutionException(cause); }}else {
        return null; }}Copy the code

The await() method determines whether the Future task is finished, then obtains this lock, and calls Object’s wait() if the task is not completed. The source code is as follows:

public Promise<V> await(a) throws InterruptedException { 
    if (this.isDone()) {
        return this;
    } else if (Thread.interrupted()) {
        throw new InterruptedException(this.toString());
    } else {
        this.checkDeadLock();
        synchronized(this) {
            while(!this.isDone()) {
                this.incWaiters();

                try {
                    this.wait();
                } finally {
                    this.decWaiters(); }}return this; }}/ /...
}
Copy the code

The addListener method is called, passing the incoming callback into the Listeners object. If more than one listener is listened on, DeflaultFutureListeners are created to store the callback methods in an array.

RemoveListener sets the Listeners to null(if there is only one) or removes them from the array (if there are multiple callbacks). The source code is as follows.

public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
    ObjectUtil.checkNotNull(listener, "listener");
    synchronized(this) {
        this.addListener0(listener);
    }

    if (this.isDone()) {
        this.notifyListeners();
    }

    return this;
}   

public Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
    ObjectUtil.checkNotNull(listeners, "listeners");
    synchronized(this) {
        GenericFutureListener[] var3 = listeners;
        int var4 = listeners.length;
        int var5 = 0;

        while(var5 < var4) {
            GenericFutureListener<? extends Future<? super V>> listener = var3[var5];
            if(listener ! =null) {
                this.addListener0(listener);
                ++var5;
                continue; }}}if (this.isDone()) {
        this.notifyListeners();
    }

    return this;
}

public Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener) {
    ObjectUtil.checkNotNull(listener, "listener");
    synchronized(this) {
        this.removeListener0(listener);
        return this; }}public Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
    ObjectUtil.checkNotNull(listeners, "listeners");
    synchronized(this) {
        GenericFutureListener[] var3 = listeners;
        int var4 = listeners.length;

        for(int var5 = 0; var5 < var4; ++var5) {
            GenericFutureListener<? extends Future<? super V>> listener = var3[var5];
            if (listener == null) {
                break;
            }

            this.removeListener0(listener);
        }

        return this; }}Copy the code

A listener event is emitted immediately if done() completes the task while adding a listener. Trigger listeners are implemented through notifyListeners(). The main logic is as follows:

If the current addListener thread (or rather, the thread that called the notifyListeners) Because addListener and setSuccess call notifyListeners and Promise pools of the same thread, they are executed on the same thread, otherwise they are submitted to the thread pool for execution.

NotifyListeners () are called for setSuccess in the thread pool where the Future task is executed and are placed on the current thread. NotifyListeners are maintained internally to record whether listening events have been triggered. Only notifyListeners that have not been triggered and the listener list is not empty are the listeners iterated and operationComplete called.

The Netty DefaultChannelPromise

DefaultChannelPromise is a subclass of DefaultPromise that internally maintains a channel variable, channel.

The methods associated with the Promise mechanism all call superclass methods.

In addition, DefaultChannelPromise also implements FlushCheckpoint interface, for the use of ChannelFlushPromiseNotifier, ChannelFuture can be registered to ChannelFlushPromiseNotifier class, used when data is written to or reach the checkpoint.

The core source code is as follows:

public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise.FlushCheckpoint {
    private final Channel channel;
    private long checkpoint;

	/ /...

    public Channel channel(a) {
        return this.channel;
    }

    public ChannelPromise setSuccess(a) {
        return this.setSuccess((Void)null);
    }

    public ChannelPromise setSuccess(Void result) {
        super.setSuccess(result);
        return this;
    }

    public boolean trySuccess(a) {
        return this.trySuccess((Object)null);
    }

    public ChannelPromise setFailure(Throwable cause) {
        super.setFailure(cause);
        return this;
    }

    / /...

    public ChannelPromise promise(a) {
        return this;
    }

    protected void checkDeadLock(a) {
        if (this.channel().isRegistered()) {
            super.checkDeadLock(); }}public ChannelPromise unvoid(a) {
        return this;
    }

    public boolean isVoid(a) {
        return false; }}Copy the code

conclusion

We have analyzed the Promise in Netty above and know that it is a writable Future extended from Netty.