The premise

Recently, I have been reading about Netty and writing a lightweight RPC framework to practice. On the way, I found many bright points in Netty’s source code, and some implementations can even be described as “harsh”. In addition, Netty provides a very good tool class, can be used out of the box. Here is an analysis of my favorite area, a Netty tool module in concurrency – Promise.


Environment Version:

  • Netty: 4.1.44. Final
  • JDK1.8

Introduction of Promise

Promise, a Chinese translation, is a Promise or Promise, meaning that between people, one person said to another with a certain vision, usually can be fulfilled.

Io.net ty. Util. Concurrent. Promise in comments only one word: “Special writable” io.net ty. Util. Concurrent. The Future (Promise interface is io.net ty. Util. Concurrent. The Future of sub interface). The io.net ty. Util. Concurrent. The Future is Java. Util. Concurrent. The expansion of the Future, said “a asynchronous operation result. As we know, the Future in the JDK and the package is not writable and does not provide a listening entry (no observer mode is applied), and Promises compensate for both problems nicely. On the other hand, from the perspective of inheritance, DefaultPromise is the final implementation class of these interfaces, so the source code analysis needs to focus on DefaultPromise class. Generally, functions provided by a module are defined by interfaces. Here is the function list of the two interfaces:

  • io.netty.util.concurrent.Promise
  • io.netty.util.concurrent.Future

Look at the io.net ty. Util. Concurrent. The Future interface:

public interface Future<V> extends java.util.concurrent.Future<V> {

    // Check whether the I/O operation is successful
    boolean isSuccess(a);

 // Flags whether an I/O operation can be canceled with cancel(Boolean mayInterruptIfRunning) below  boolean isCancellable(a);   // Returns an exception instance of the I/O operation - this method returns NULL if the I/O operation itself was successful  Throwable cause(a);   // Add listeners to the current Future instance to listen for the completion of the Future operation - all listener instances get callbacks after the isDone() method is activated  Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);  Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);   // Remove the listener that listens to the completion of the Future operation for the current Future  Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);  Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);   // Synchronously wait for the Future to complete and get the final result (success) or throw an exception (failure) and respond to an interrupt  Future<V> sync(a) throws InterruptedException;   // Synchronously wait for the Future to complete and get the final result (success) or throw an exception (failure) without responding to interrupts  Future<V> syncUninterruptibly(a);   // Wait for the Future to complete, interrupt the response  Future<V> await(a) throws InterruptedException;   // Wait for the Future to complete without responding to interrupts  Future<V> awaitUninterruptibly(a);   // Wait for the Future to complete with timeout, interrupt the response  boolean await(long timeout, TimeUnit unit) throws InterruptedException;  boolean await(long timeoutMillis) throws InterruptedException;   // Wait for the Future to complete with a timeout without responding to interrupts  boolean awaitUninterruptibly(long timeout, TimeUnit unit);  boolean awaitUninterruptibly(long timeoutMillis);   If the Future is not completed, this method must return null; Some scenarios require a double check to see if isDone() is true if the Future succeeds in retrieving null  V getNow(a);   CancellationException CancellationException CancellationException CancellationException CancellationException CancellationException CancellationException CancellationException  @Override  boolean cancel(boolean mayInterruptIfRunning); } Copy the code

Sync () and await() methods are similar except that sync() checks for exception execution and throws exception instances as soon as an exception is detected, while await() methods have no awareness of exceptions.

Then look at io.net ty. Util. Concurrent. Promise interface:

public interface Promise<V> extends Future<V> {
   
    // Mark the current Future as successful, set the result, notify all listeners if successful, and throw an IllegalStateException if the Future has succeeded or failed
    Promise<V> setSuccess(V result);

 // Marks the current Future as successful, sets the result, notifies all listeners if successful and returns true, otherwise returns false  boolean trySuccess(V result);   // Mark the current Future as failed, set it to an exception instance, notify all listeners if successful, and throw an IllegalStateException if the Future has succeeded or failed  Promise<V> setFailure(Throwable cause);   // Mark the current Future as failed, setting the result as an exception instance, notifying all listeners if successful and returning true, otherwise returning false  boolean tryFailure(Throwable cause);   // Mark the current Promise instance as uncancelable, returning true on success, false otherwise  boolean setUncancellable(a);   / / the following method and io.net ty. Util. Concurrent. The method is almost the same in the Future, just changed the return type for the Promise   @Override  Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);   @Override  Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);   @Override  Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);   @Override  Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);   @Override  Promise<V> await(a) throws InterruptedException;   @Override  Promise<V> awaitUninterruptibly(a);   @Override  Promise<V> sync(a) throws InterruptedException;   @Override  Promise<V> syncUninterruptibly(a); } Copy the code

At this point, all the functions of the Promise interface are analyzed, and then the implementation of the Promise is analyzed in detail from the source point of view.

Promise source code implementation

Promise implementation classes for io.net ty. Util. Concurrent. DefaultPromise (DefaultPromise actually there are many subclasses, some implementation is to customize certain scenes made extension). While DefaultPromise inherited from io.net ty. Util. Concurrent. AbstractFuture:

public abstract class AbstractFuture<V> implements Future<V> {

    // Permanently block the method waiting for the result
    @Override
    public V get(a) throws InterruptedException, ExecutionException {
 // Block by calling the permanent wait method in response to the interrupt  await();  // After waking up from a permanent block, determine whether the Future executes an exception  Throwable cause = cause();  if (cause == null) {  // If the exception is empty, the execution is successful. Call getNow() to return the result  return getNow();  }  CancellationException: CancellationException: CancellationException: CancellationException: CancellationException: CancellationException: CancellationException: CancellationException  if (cause instanceof CancellationException) {  throw (CancellationException) cause;  }  // All exceptions that are not cancelled are wrapped as ExecutionException thrown  throw new ExecutionException(cause);  }   // Block the method with timeout waiting for the result  @Override  public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {  // Call the timeout wait method with the response interrupt to block  if (await(timeout, unit)) {  // After waking up from a timeout block, determine whether the Future executes abnormally  Throwable cause = cause();  if (cause == null) {  // If the exception is empty, the execution is successful. Call getNow() to return the result  return getNow();  }  CancellationException: CancellationException: CancellationException: CancellationException: CancellationException: CancellationException: CancellationException: CancellationException  if (cause instanceof CancellationException) {  throw (CancellationException) cause;  }  // In the case of a non-wait timeout, all exceptions that are not cancelled are wrapped as ExecutionException thrown  throw new ExecutionException(cause);  }  // The method steps in to indicate that it is waiting for a timeout, and throws a TimeoutException  throw new TimeoutException();  } } Copy the code

AbstractFuture only to get () and get (long timeout, TimeUnit unit) was realized by the two methods, in fact, the two implementation and Java. Util. Concurrent. The implementation of FutureTask is similar to the way.

DefaultPromise source code more, here separate multiple parts to read, first look at its properties and constructors:

public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {

    // This is the log handle of a normal log. InternalLogger is an internal-encapsulated Netty log interface
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultPromise.class);

 The Promise needs to be submitted to the thread for execution as a task. If the task is rejected, this log handle will be used to print the log  private static final InternalLogger rejectedExecutionLogger =  InternalLoggerFactory.getInstance(DefaultPromise.class.getName() + ".rejectedExecution");   // The maximum stack depth of the listener. The default value is 8. This value is used to prevent the stack depth from running out of memory when nested callbacks are called  private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8. SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth".8));   // Result updater, used by CAS to update the value of result  @SuppressWarnings("rawtypes")  private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =  AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");   // This is used to fill in the value of result. When the result is set to null, the Promise succeeds, and this value is used to represent the successful result  private static final Object SUCCESS = new Object();   // The value used to populate result, indicating that the Promise cannot be cancelled  private static final Object UNCANCELLABLE = new Object();   // CancellationException instance holder, used to determine the Promise cancellation status and throw a CancellationException  private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(ThrowableUtil.unknownStackTrace(  new CancellationException(), DefaultPromise.class, "cancel(...) "));   CANCELLATION_CAUSE_HOLDER is an array of exception stack information elements  private static final StackTraceElement[] CANCELLATION_STACK = CANCELLATION_CAUSE_HOLDER.cause.getStackTrace();   // A real result Object, of type Object, may be null, a real result instance, SUCCESS, UNCANCELLABLE, CANCELLATION_CAUSE_HOLDER, etc  private volatile Object result;   // The event executor can be interpreted as a single scheduling thread  private final EventExecutor executor;   // A collection of listeners, which may be individual instances of GenericFutureListener or DefaultFutureListeners  private Object listeners;   // Number of threads waiting for results  private short waiters;   // Flags whether the listener is being called back  private boolean notifyingListeners;   // The constructor depends on EventExecutor  public DefaultPromise(EventExecutor executor) {  this.executor = checkNotNull(executor, "executor");  }   protected DefaultPromise(a) {  // Only for subclasses - This constructor is reserved for subclasses  executor = null;  }   / /... Omit other code...   // Private static inner class that holds Throwable instances, i.e. instances that hold exceptions  private static final class CauseHolder {  final Throwable cause;  CauseHolder(Throwable cause) {  this.cause = cause;  }  }   // Private static inner class. The stack information used to override CancellationException is the previously defined CANCELLATION_STACK, and overrides the full class name of CancellationException returned by toString()  private static final class LeanCancellationException extends CancellationException {  private static final long serialVersionUID = 2794674970981187807L;   @Override  public Throwable fillInStackTrace(a) {  setStackTrace(CANCELLATION_STACK);  return this;  }   @Override  public String toString(a) {  return CancellationException.class.getName();  }  }  / /... Omit other code... } Copy the code

Promise currently supports two types of listeners:

  • GenericFutureListener: supports genericsFutureThe listener.
  • GenericProgressiveFutureListener: it isGenericFutureListenerSupport progress representation and support genericsFutureListeners (some scenarios require multiple steps to implement, similar to progress bars).
// GenericFutureListener
public interface GenericFutureListener<F extends Future<? >>extends EventListener {

    void operationComplete(F future) throws Exception;
}
 // GenericProgressiveFutureListener public interface GenericProgressiveFutureListener<F extends ProgressiveFuture<? >>extends GenericFutureListener<F> {   void operationProgressed(F future, long progress, long total) throws Exception; } Copy the code

To make Promise support multiple listeners, Netty added a DefaultFutureListeners class modified by default to hold an array of listener instances:

// DefaultFutureListeners
final class DefaultFutureListeners {

    privateGenericFutureListener<? extends Future<? >>[] listeners;    private int size;
 private int progressiveSize; // the number of progressive listeners   // The purpose of this construct is to convert a single GenericFutureListener instance in a Promise to a Listeners type of DefaultFutureListeners  @SuppressWarnings("unchecked") DefaultFutureListeners(GenericFutureListener<? extends Future<? >> first, GenericFutureListener<? extends Future<? >> second) { listeners = new GenericFutureListener[2];  listeners[0] = first;  listeners[1] = second;  size = 2;  if (first instanceof GenericProgressiveFutureListener) {  progressiveSize ++;  }  if (second instanceof GenericProgressiveFutureListener) {  progressiveSize ++;  }  }   public void add(GenericFutureListener
                             > l) { GenericFutureListener<? extends Future<? >>[] listeners =this.listeners;  final int size = this.size;  // Notice that the size of the array is doubled each time  if (size == listeners.length) {  this.listeners = listeners = Arrays.copyOf(listeners, size << 1);  }  // Add the current GenericFutureListener to the array  listeners[size] = l;  // The total number of listeners increases by 1  this.size = size + 1;  / / if for GenericProgressiveFutureListener, the band schedule indicates the total number of listeners  if (l instanceof GenericProgressiveFutureListener) {  progressiveSize ++;  }  }   public void remove(GenericFutureListener
                                              > l) {  finalGenericFutureListener<? extends Future<? >>[] listeners =this.listeners;  int size = this.size;  for (int i = 0; i < size; i ++) {  if (listeners[i] == l) {  // Compute the subscript of the listener that needs to be moved  int listenersToMove = size - i - 1;  if (listenersToMove > 0) {  // All the elements after listenersToMove are moved to the front of the array  System.arraycopy(listeners, i + 1, listeners, i, listenersToMove);  }  // The last position of the current listener count is set to null, minus 1  listeners[-- size] = null;  this.size = size;  / / if the listener is GenericProgressiveFutureListener with schedule indicates the total number of listeners minus 1  if (l instanceof GenericProgressiveFutureListener) {  progressiveSize --;  }  return;  }  }  }   // Returns an array of listener instances  publicGenericFutureListener<? extends Future<? >>[] listeners() { return listeners;  }   // Return the total number of listeners  public int size(a) {  return size;  }   // Returns the total number of listeners with a progress indicator  public int progressiveSize(a) {  return progressiveSize;  } } Copy the code

Then look at the rest of the DefaultPromise method implementation, I think DefaultPromise method implementation in the code sequence is a certain art. Let’s start with a few ways to determine the state of a Promise’s execution:

public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {

    / /... Omit other code...

    @Override
 public boolean setUncancellable(a) {  // CAS updates result to UNCANCELLABLE, expects the old value to be NULL, and returns true on success  if (RESULT_UPDATER.compareAndSet(this.null, UNCANCELLABLE)) {  return true;  }  Object result = this.result;  If isDone0() and isCancelled0() are final states, return false if the final state is hit  // If result is not null, it can only be an instance of UNCANCELLABLE.  return! isDone0(result) || ! isCancelled0(result); }   @Override  public boolean isSuccess(a) {  Object result = this.result;  // If executed successfully, the result is not null, UNCANCELLABLE, or CauseHolder  // (result can only be an instance of the developer-defined SUCCESS attribute)  returnresult ! =null&& result ! = UNCANCELLABLE && ! (resultinstanceof CauseHolder);  }   @Override  public boolean isCancellable(a) {  If result is null, the Promise is initialized and has not yet been executed  return result == null;  }   @Override  public Throwable cause(a) {  // Get the Throwable instance from the current result  return cause0(result);  }   private Throwable cause0(Object result) {  // Result is not of type CauseHolder, then null is returned  if(! (resultinstanceof CauseHolder)) {  return null;  }  // If result is CANCELLATION_CAUSE_HOLDER (static CancellationException holding)  if (result == CANCELLATION_CAUSE_HOLDER) {  / / is to create a new custom LeanCancellationException instance  CancellationException ce = new LeanCancellationException();  / / if the CAS updated results result for LeanCancellationException new instance is returned  if (RESULT_UPDATER.compareAndSet(this, CANCELLATION_CAUSE_HOLDER, new CauseHolder(ce))) {  return ce;  }  Result is a custom CauseHolder instance that is not CANCELLATION_CAUSE_HOLDER  result = this.result;  }  // Return the CauseHolder to the cause  return ((CauseHolder) result).cause;  }   // Static method to determine whether a Promise is cancelled based on the fact that Result must be of type CauseHolder and cause must be of type CancellationException or a subclass of CauseHolder  private static boolean isCancelled0(Object result) {  return result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException;  }   // Static method to determine whether a Promise is completed if result is not null and is not an instance of the UNCANCELLABLE attribute  private static boolean isDone0(Object result) {  returnresult ! =null&& result ! = UNCANCELLABLE; }   // Determine whether the Promise instance is cancelled  @Override  public boolean isCancelled(a) {  return isCancelled0(result);  }   // Determine if the Promise instance is completed  @Override  public boolean isDone(a) {  return isDone0(result);  }  / /... Omit other code... } Copy the code

Now look at adding and removing listeners (which also includes the logic for notifying listeners) :

public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {

    / /... Omit other code...
    @Override
    public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
 // The input parameter is not null  checkNotNull(listener, "listener");  // Lock the Promise instance itself  synchronized (this) {  // Add a listener  addListener0(listener);  }  // If the Promise instance has completed execution, the listener is notified for a callback  if (isDone()) {  notifyListeners();  }  return this;  }   @Override  public Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {  // The input parameter is not null  checkNotNull(listeners, "listeners");  // Lock the Promise instance itself  synchronized (this) {  // Add listeners to the argument group  for (GenericFutureListener<? extends Future<? super V>> listener : listeners) {  if (listener == null) {  break;  }  addListener0(listener);  }  }  // If the Promise instance has completed execution, the listener is notified for a callback  if (isDone()) {  notifyListeners();  }   return this;  }   @Override  public Promise<V> removeListener(final GenericFutureListener<? extends Future<? super V>> listener) {  // The input parameter is not null  checkNotNull(listener, "listener");  // Lock the Promise instance itself  synchronized (this) {  // Remove the listener  removeListener0(listener);  }  return this;  }   @Override  public Promise<V> removeListeners(final GenericFutureListener<? extends Future<? super V>>... listeners) {  // The input parameter is not null  checkNotNull(listeners, "listeners");  // Lock the Promise instance itself  synchronized (this) {  // Iterate through the argument group to remove listeners  for (GenericFutureListener<? extends Future<? super V>> listener : listeners) {  if (listener == null) {  break;  }  removeListener0(listener);  }  }  return this;  }   private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {  // If the Promise instance holds null listeners, set them directly to the listener  if (listeners == null) {  listeners = listener;  } else if (listeners instanceof DefaultFutureListeners) {  // If the current Promise instance holds the type of DefaultFutureListeners, call its add() method to add them  ((DefaultFutureListeners) listeners).add(listener);  } else {  Note that the current Promise instance holds a single GenericFutureListener instance and needs to be converted to DefaultFutureListeners instance  listeners = newDefaultFutureListeners((GenericFutureListener<? >) listeners, listener); }  }   private void removeListener0(GenericFutureListener<? extends Future<? super V>> listener) {  // If the current Promise instance holds the type of DefaultFutureListeners, its remove() method is called to remove them  if (listeners instanceof DefaultFutureListeners) {  ((DefaultFutureListeners) listeners).remove(listener);  } else if (listeners == listener) {  // If the current Promise instance holds a listeners type other than DefaultFutureListeners, that is, a single GenericFutureListener identical to the listener passed in,  // The listeners of the Promise instance are null  listeners = null;  }  }   private void notifyListeners(a) {  EventExecutor executor = executor();  // The current thread of execution is the event loop thread, so the direct synchronous calls are simply the same thread calling the notifyListeners() method as EventExecutor  if (executor.inEventLoop()) {  // The following ThreadLocal and listenerStackDepth are related to call stack depth protection, which can be ignored in a separate section of this blog  final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();  final int stackDepth = threadLocals.futureListenerStackDepth();  if (stackDepth < MAX_LISTENER_STACK_DEPTH) {  threadLocals.setFutureListenerStackDepth(stackDepth + 1);  try {  notifyListenersNow();  } finally {  threadLocals.setFutureListenerStackDepth(stackDepth);  }  return;  }  }  // If the current thread of execution is not an event loop thread, notifyListenersNow() is wrapped as a Runnable instance and executed in EventExecutor  safeExecute(executor, new Runnable() {  @Override  public void run(a) {  notifyListenersNow();  }  });  }   // If EventExecutor is used for task execution, exceptions thrown by the execute() method will be printed using the rejectedExecutionLogger handle  private static void safeExecute(EventExecutor executor, Runnable task) {  try {  executor.execute(task);  } catch (Throwable t) {  rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?", t);  }  }   // Immediately notify all listeners to call back  private void notifyListenersNow(a) {  Object listeners;  If multiple threads call the notifyListenersNow() method of the same Promise instance, the value of the notifyingListeners is set under the protection of the lock  // Threads that hit notifyingListeners can be returned directly  synchronized (this) {  // Only proceed if there are listeners to notify and we are not already notifying listeners.  if (notifyingListeners || this.listeners == null) {  return;  }  notifyingListeners = true;  // Temporary listeners store instantaneous listeners so that we can set the Promise instance to null  listeners = this.listeners;  // Reset the current Promise instance to null  this.listeners = null;  }  for (;;) {  if (listeners instanceof DefaultFutureListeners) {  // Notification for multiple listeners  notifyListeners0((DefaultFutureListeners) listeners);  } else {  // Single listener notifications  notifyListener0(this, (GenericFutureListener<? >) listeners); }  synchronized (this) {  if (this.listeners == null) {  // Since there is no possibility of an exception being thrown, we do not need to write the notice notice in the finally block to reset the notifyingListeners to false and return an exit loop  notifyingListeners = false;  return;  }  // Temporary listeners store instantaneous listeners, and callbacks are determined based on the temporary instance -- another thread may have updated the value of the listeners  listeners = this.listeners;  // Reset the current Promise instance to null, ensuring that listeners are called back only once before breaking the for loop  this.listeners = null;  }  }  }   // Call static notifyListener0() to traverse the Listeners array of DefaultFutureListeners  private void notifyListeners0(DefaultFutureListeners listeners) { GenericFutureListener<? >[] a = listeners.listeners(); int size = listeners.size();  for (int i = 0; i < size; i ++) {  notifyListener0(this, a[i]);  }  }   // This static method is the final listener callback method, which simply calls GenericFutureListener#operationComplete(), passing in the current Promise instance, catching any exceptions and printing a WARN log  @SuppressWarnings({ "unchecked"."rawtypes" })  private static void notifyListener0(Future future, GenericFutureListener l) {  try {  l.operationComplete(future);  } catch (Throwable t) {  if (logger.isWarnEnabled()) {  logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);  }  }  } } Copy the code

Then look at the wait() and sync() methods:

public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {

    / /... Omit other code...

    @Override
 public Promise<V> await(a) throws InterruptedException {  // If the Promise completes, return directly  if (isDone()) {  return this;  }  InterruptedException is thrown if the current thread is interrupted  if (Thread.interrupted()) {  throw new InterruptedException(toString());  }  // Deadlock detection  checkDeadLock();  // Lock the current Promise instance  synchronized (this) {  // Set an infinite loop where isDone() is true  while(! isDone()) { // The number of waiting threads increases by 1  incWaiters();  try {  The Object#wait() method is called to block, and InterruptedException is thrown if the thread is interrupted  wait();  } finally {  // The number of waiting threads decreases by 1 after unblocking  decWaiters();  }  }  }  return this;  }   @Override  public Promise<V> awaitUninterruptibly(a) {  // If the Promise completes, return directly  if (isDone()) {  return this;  }  // Deadlock detection  checkDeadLock();  boolean interrupted = false;  // Lock the current Promise instance  synchronized (this) {  // Set an infinite loop where isDone() is true  while(! isDone()) { // The number of waiting threads increases by 1  incWaiters();  try {  The Object#wait() method is called to block and catch InterruptedException. If InterruptedException is thrown, the thread is interrupted  wait();  } catch (InterruptedException e) {  // Interrupted while waiting.  interrupted = true;  } finally {  // The number of waiting threads decreases by 1 after unblocking  decWaiters();  }  }  }  // If the thread breaks out of the wait block, the interrupt flag bit of the thread is cleared  if (interrupted) {  Thread.currentThread().interrupt();  }  return this;  }   // The following wait() methods with timeout are called await0()   @Override  public boolean await(long timeout, TimeUnit unit) throws InterruptedException {  return await0(unit.toNanos(timeout), true);  }   @Override  public boolean await(long timeoutMillis) throws InterruptedException {  return await0(MILLISECONDS.toNanos(timeoutMillis), true);  }   @Override  public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {  try {  return await0(unit.toNanos(timeout), false);  } catch (InterruptedException e) {  // Should not be raised at all.  throw new InternalError();  }  }   @Override  public boolean awaitUninterruptibly(long timeoutMillis) {  try {  return await0(MILLISECONDS.toNanos(timeoutMillis), false);  } catch (InterruptedException e) {  // Should not be raised at all.  throw new InternalError();  }  }   / / check the deadlock, judgment here waiting thread is thread will directly sell BlockingOperationException abnormal event cycle  The Promise thread and the thread waiting for the result cannot be the same thread, otherwise the dependency will loop  protected void checkDeadLock(a) {  EventExecutor e = executor();  if(e ! =null && e.inEventLoop()) {  throw new BlockingOperationException(toString());  }  }   @Override  public Promise<V> sync(a) throws InterruptedException {  // Sync permanently blocks wait  await();  // Block waiting to be released, if there is an exception in the execution, it is thrown directly  rethrowIfFailed();  return this;  }   @Override  public Promise<V> syncUninterruptibly(a) {  // Synchronization is permanently blocked waiting - response is interrupted  awaitUninterruptibly();  // If there is an exception in the execution, it is thrown directly  rethrowIfFailed();  return this;  }   // Waiters + 1, IllegalStateException is thrown if your work exceeds short-. MAX_VALUE  private void incWaiters(a) {  if (waiters == Short.MAX_VALUE) {  throw new IllegalStateException("too many waiters: " + this);  }  ++waiters;  }   / / waiters minus 1  private void decWaiters(a) {  --waiters;  }   // Cause is not null  private void rethrowIfFailed(a) {  Throwable cause = cause();  if (cause == null) {  return;  }  PlatformDependent.throwException(cause);  }   private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {  // If the Promise completes, return directly  if (isDone()) {  return true;  }  IsDone () is returned if the timeout is less than 0  if (timeoutNanos <= 0) {  return isDone();  }  // If interrupts are allowed and the current thread's interrupt flag bit is true, InterruptedException is thrown  if (interruptable && Thread.interrupted()) {  throw new InterruptedException(toString());  }  // Deadlock detection  checkDeadLock();  // Record the current nanosecond timestamp  long startTime = System.nanoTime();  // The length of the wait time - in nanoseconds  long waitTime = timeoutNanos;  // Record whether the thread is interrupted  boolean interrupted = false;  try {  / / death cycle  for (;;) {  synchronized (this) {  // If the Promise is completed, return true - this step is a prior judgment, so there is no need to block  if (isDone()) {  return true;  }  // The number of waiting threads increases by 1  incWaiters();  try {  // the Object#wait() method with timeout is called to block  wait(waitTime / 1000000, (int) (waitTime % 1000000));  } catch (InterruptedException e) {  InterruptedException is thrown if the thread is interrupted and the interruption is externally permitted  if (interruptable) {  throw e;  } else {  // Otherwise, only the interrupted state is recorded  interrupted = true;  }  } finally {  // The number of waiting threads decreases by 1 after unblocking  decWaiters();  }  }  // After unblocking, if the Promise completes, return true  if (isDone()) {  return true;  } else {  // Enter here to indicate that the Promise has not finished executing, then recalculate the length of the waiting interval (corrected), if greater than 0 then enter the next cycle  waitTime = timeoutNanos - (System.nanoTime() - startTime);  if (waitTime <= 0) {  return isDone();  }  }  }  } finally {  // If the thread breaks out of the wait block, the interrupt flag bit of the thread is cleared  if (interrupted) {  Thread.currentThread().interrupt();  }  }  }  / /... Omit other code... } Copy the code

Finally, there are a few ways to set and get the results:

public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {

    / /... Omit other code...
    @Override
    public Promise<V> setSuccess(V result) {
 // The result is set successfully, and the current Promise instance is returned if set successfully  if (setSuccess0(result)) {  return this;  }  // Set failure indicates multiple sets, Promise has been completed, and an exception is thrown  throw new IllegalStateException("complete already: " + this);  }   @Override  public boolean trySuccess(V result) {  // Set the success result. The returned Boolean value indicates success or failure  return setSuccess0(result);  }   @Override  public Promise<V> setFailure(Throwable cause) {  // The result of a failed setting, and the current Promise instance is returned if the setting succeeds  if (setFailure0(cause)) {  return this;  }  // Set failure indicates multiple sets, Promise has been completed, and an exception is thrown  throw new IllegalStateException("complete already: " + this, cause);  }   @Override  public boolean tryFailure(Throwable cause) {  // Set the failure result. The returned Boolean value indicates success or failure  return setFailure0(cause);  }   @SuppressWarnings("unchecked")  @Override  public V getNow(a) {  Return null if result is a CauseHolder, an instance of a SUCCESS attribute, or an UNCANCELLABLE implementation, otherwise return the converted result  If the CauseHolder wraps an exception, this method still returns null  Object result = this.result;  if (result instanceof CauseHolder || result == SUCCESS || result == UNCANCELLABLE) {  return null;  }  return (V) result;  }   @SuppressWarnings("unchecked")  @Override  public V get(a) throws InterruptedException, ExecutionException {  // Permanently block to get results  Object result = this.result;  // Block permanently if the Promise is not fulfilled  if(! isDone0(result)) { await();  // Update the result temporary variable  result = this.result;  }  Return null if result is an instance of the SUCCESS attribute or UNCANCELLABLE attribute  if (result == SUCCESS || result == UNCANCELLABLE) {  return null;  }  // If result is of type CauseHolder, the cause attribute held by result is obtained, which may also be null  Throwable cause = cause0(result);  if (cause == null) {  // The result value of the conversion type is returned if the execution succeeds  return (V) result;  }  // CancellationException is thrown  if (cause instanceof CancellationException) {  throw (CancellationException) cause;  }  // The rest of the cases are encapsulated as executionExceptions  throw new ExecutionException(cause);  }   @SuppressWarnings("unchecked")  @Override  public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {  // Block with timeout to get results  Object result = this.result;  // Block wait with timeout if Promise is not fulfilled  if(! isDone0(result)) { if(! await(timeout, unit)) { // Wait for a timeout to throw a TimeoutException directly  throw new TimeoutException();  }  // Update the result temporary variable  result = this.result;  }  Return null if result is an instance of the SUCCESS attribute or UNCANCELLABLE attribute  if (result == SUCCESS || result == UNCANCELLABLE) {  return null;  }  // If result is of type CauseHolder, the cause attribute held by result is obtained, which may also be null  Throwable cause = cause0(result);  if (cause == null) {  // The result value of the conversion type is returned if the execution succeeds  return (V) result;  }  // CancellationException is thrown  if (cause instanceof CancellationException) {  throw (CancellationException) cause;  }  // The rest of the cases are encapsulated as executionExceptions  throw new ExecutionException(cause);  }   @Override  public boolean cancel(boolean mayInterruptIfRunning) {  // CAS updates result to CANCELLATION_CAUSE_HOLDER, and the expected value of result must be null  if (RESULT_UPDATER.compareAndSet(this.null, CANCELLATION_CAUSE_HOLDER)) {  // Determine whether to wait for thread notification  if (checkNotifyWaiters()) {  // Notify the listener for a callback  notifyListeners();  }  return true;  }  return false;  }   private boolean setSuccess0(V result) {  // Set the result of successful execution. If result is null, select SUCCESS; otherwise, use result  return setValue0(result == null ? SUCCESS : result);  }   private boolean setFailure0(Throwable cause) {  // Set the result of an execution failure. The input parameter is a Throwable, encapsulated as a CauseHolder, and stored in the Cause attribute of the CauseHolder instance  return setValue0(new CauseHolder(checkNotNull(cause, "cause")));  }   private boolean setValue0(Object objResult) {  // CAS updates result to objResult. The expected value of result must be null or UNCANCELLABLE for the update to succeed  if (RESULT_UPDATER.compareAndSet(this.null, objResult) || RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {  // Determine whether to wait for thread notification  if (checkNotifyWaiters()) {  // Notify the listener for a callback  notifyListeners();  }  return true;  }  return false;  }   // Determine whether to wait for thread notification - in fact, determine whether to notify listener callback  private synchronized boolean checkNotifyWaiters(a) {  // If the number of waiting threads is greater than zero, call Object#notifyAll() to wake up all waiting threads  if (waiters > 0) {  notifyAll();  }  True is returned if the listeners are not empty  returnlisteners ! =null;  }  / /... Omit other code... } Copy the code

Basic use of Promise

To use Netty’s Promise module, you don’t need to import all of Netty’s dependencies, just netty-common:

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-common</artifactId>
    <version>4.1.44. The Final</version>
</dependency>
Copy the code

For EventExecutor selection, Netty has prepared a GlobalEventExecutor for global event handling, which can be used directly (of course, you can also implement EventExecutor or other implementation classes using EventExecutor) :

EventExecutor executor = GlobalEventExecutor.INSTANCE;
Promise<String> promise = new DefaultPromise<>(executor);
Copy the code

Here is a scenario: asynchronously download a linked resource to the disk, asynchronously notify the disk file path after downloading, and print the download result to the console after receiving the notification.

public class PromiseMain {

    public static void main(String[] args) throws Exception {
        String url = "http://xxx.yyy.zzz";
        EventExecutor executor = GlobalEventExecutor.INSTANCE;
 Promise<DownloadResult> promise = new DefaultPromise<>(executor);  promise.addListener(new DownloadResultListener());  Thread thread = new Thread(() -> {  try {  System.out.println("Start downloading resources, URL :" + url);  long start = System.currentTimeMillis();  // Simulate the download time  Thread.sleep(2000);  String location = "C:\\xxx\\yyy\\z.md";  long cost = System.currentTimeMillis() - start;  System.out.println(String.format("Download resource successfully, URL :%s, save to :%s, time :%d ms", url, location, cost));  DownloadResult result = new DownloadResult();  result.setUrl(url);  result.setFileDiskLocation(location);  result.setCost(cost);  // Notify the result  promise.setSuccess(result);  } catch (Exception ignore) {   }  }, "Download-Thread");  thread.start();  Thread.sleep(Long.MAX_VALUE);  }   @Data  private static class DownloadResult {   private String url;   private String fileDiskLocation;   private long cost;  }   private static class DownloadResultListener implements GenericFutureListener<Future<DownloadResult>> {   @Override  public void operationComplete(Future<DownloadResult> future) throws Exception {  if (future.isSuccess()) {  DownloadResult downloadResult = future.getNow();  System.out.println(String.format("Notification of download completion, URL :%s, file disk path :%s, Time :%d ms", downloadResult.getUrl(),  downloadResult.getFileDiskLocation(), downloadResult.getCost()));  }  }  } } Copy the code

Console output after execution:

Download the resource at http://xxx.yyy.zzzDownload resource successfully,url:http://xxx.yyy.zzz, save to :C:\ XXX \yyy\z.md, time :2000 msDownload completion notice, URL :http://xxx.yyy.zzz, file disk path :C:\ XXX \yyy\z.md, time :2000 msCopy the code

Promise can be used in many scenarios. In addition to asynchronous notification scenarios, it can also be used for synchronous calls. It is much more flexible in design than JUC’s Future, and it extends many new features based on the Future.

Promise listener stack depth issues

Sometimes, because of encapsulation or artificial coding exceptions, listener callbacks may appear in chains based on multiple promises (see issue-5302, a Promise Listener chain). In this case, the recursive call depth may be too deep to cause stack overflow, so a threshold needs to be set. Limit recursive calls to the maximum stack depth, the depth of threshold for the moment is called the stack depth protection threshold, the default value is 8, can through the system parameters. Io.net ty defaultPromise. MaxListenerStackDepth override Settings. Here’s the code block I mentioned earlier:

private void notifyListeners(a) {
    EventExecutor executor = executor();
    // The executor must be of event loop type, i.e. executor.ineventLoop () is true to enable recursive stack depth protection
    if (executor.inEventLoop()) {
        // Gets the current thread bound InternalThreadLocalMap instance, which is similar to ThreadLocal
 final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();  // Get the listener stack depth of the current thread  final int stackDepth = threadLocals.futureListenerStackDepth();  // The listener call stack depth does not exceed the threshold MAX_LISTENER_STACK_DEPTH  if (stackDepth < MAX_LISTENER_STACK_DEPTH) {  // Set the listener stack depth to + 1 before calling notifyListenersNow()  threadLocals.setFutureListenerStackDepth(stackDepth + 1);  try {  notifyListenersNow();  } finally {  // After notifyListenersNow() is called, the listener stack depth is set to the previous value, that is, the listener stack depth of the recovery thread  threadLocals.setFutureListenerStackDepth(stackDepth);  }  return;  }  }  // If the listener call stack depth exceeds the threshold MAX_LISTENER_STACK_DEPTH, the listener is notified each time as a new asynchronous task  safeExecute(executor, new Runnable() {  @Override  public void run(a) {  notifyListenersNow();  }  }); } Copy the code

If we wanted to simulate an example of triggering a “listener call stack depth guard”, we would simply recursively call the notifyListeners() method on the same EventLoop thread.

The most typical example is the setSuccess() method that triggers the next Promise listener in the previous Promise listener callback.


Test code:

public class PromiseListenerMain {

    private static final AtomicInteger COUNTER = new AtomicInteger(0);

    public static void main(String[] args) throws Exception {
 EventExecutor executor = ImmediateEventExecutor.INSTANCE;  // root  Promise<String> root = new DefaultPromise<>(executor);  Promise<String> p1 = new DefaultPromise<>(executor);  Promise<String> p2 = new DefaultPromise<>(executor);  Promise<String> p3 = new DefaultPromise<>(executor);  Promise<String> p4 = new DefaultPromise<>(executor);  Promise<String> p5 = new DefaultPromise<>(executor);  Promise<String> p6 = new DefaultPromise<>(executor);  Promise<String> p7 = new DefaultPromise<>(executor);  Promise<String> p8 = new DefaultPromise<>(executor);  Promise<String> p9 = new DefaultPromise<>(executor);  Promise<String> p10 = new DefaultPromise<>(executor);  p1.addListener(new Listener(p2));  p2.addListener(new Listener(p3));  p3.addListener(new Listener(p4));  p4.addListener(new Listener(p5));  p5.addListener(new Listener(p6));  p6.addListener(new Listener(p7));  p7.addListener(new Listener(p8));  p8.addListener(new Listener(p9));  p9.addListener(new Listener(p10));  root.addListener(new Listener(p1));  root.setSuccess("success");  Thread.sleep(Long.MAX_VALUE);  }   private static class Listener implements GenericFutureListener<Future<String>> {   private final String name;  private final Promise<String> promise;   public Listener(Promise<String> promise) {  this.name = "listener-" + COUNTER.getAndIncrement();  this.promise = promise;  }   @Override  public void operationComplete(Future<String> future) throws Exception {  System.out.println(String.format("Listener [%s] callback successful...", name));  if (null! = promise) { promise.setSuccess("success");  }  }  } } Copy the code

Because of safeExecute(), all of the above promises will be called back. Here you can use IDEA’s advanced breakpoint feature to add additional logs where the breakpoint is entered, with the following output:

MAX_LISTENER_STACK_DEPTH (notifyListenersNow) - executionListener [listener -9The callback succeeded...MAX_LISTENER_STACK_DEPTH (notifyListenersNow) - executionListener [listener -0The callback succeeded...MAX_LISTENER_STACK_DEPTH (notifyListenersNow) - executionListener [listener -1The callback succeeded...MAX_LISTENER_STACK_DEPTH (notifyListenersNow) - executionListener [listener -2The callback succeeded...MAX_LISTENER_STACK_DEPTH (notifyListenersNow) - executionListener [listener -3The callback succeeded...MAX_LISTENER_STACK_DEPTH (notifyListenersNow) - executionListener [listener -4The callback succeeded...MAX_LISTENER_STACK_DEPTH (notifyListenersNow) - executionListener [listener -5The callback succeeded...MAX_LISTENER_STACK_DEPTH (notifyListenersNow) - executionListener [listener -6The callback succeeded...SafeExecute (notifyListenersNow) -- -- -- -- -- -- -- -- -- --Listener [listener -7The callback succeeded...SafeExecute (notifyListenersNow) -- -- -- -- -- -- -- -- -- --Listener [listener -8The callback succeeded... Copy the code

Here I have a little doubt, if the call stack depth is more than 8, beyond the part packing actuator executed for Runnable instance submitted to the event, not the hidden trouble of the recursive stack overflow into the memory of the hidden trouble (because of the asynchronous task are also likely to backlog, unless rejected tasks submission, so specific to see EventExecutor implements)?

summary

Netty to provide the Promise tool source code and use methods are analyzed, the design concept and code are very worthy of reference, at the same time out of the box, can be directly introduced in daily coding, reduce repeated wheel labor and risk.

(E-A-20200123 C-3-D)

Technical official account Throwable Digest (ID: Throwable – DOge) will push the author’s original technical articles from time to time (never plagiarize or reprint) :


This article is formatted using MDNICE