This is the 13th day of my participation in the Gwen Challenge in November. Check out the details: The Last Gwen Challenge in 2021 “.

A list,

In NetTY, when we need to do asynchronous processing, we often call the following two methods:

Future & Promise

In fact, when we used the JDK, we knew that there was a Future interface for receiving the results of tasks asynchronously.

In Netty, based on the Future interface in the JDK, has been extended; After that, on the basis of Netty’s Future, the Promise interface was added.

See the following class diagram for the relationship between the three:

  • JDK Future: You can only synchronously wait for the end of the task (whether it succeeds or fails) to get the result.
  • Netty Future: The result can be obtained either synchronously or asynchronously, provided that the task must end.
  • Netty Promise: Not only does it have Netty Future functionality, but it also exists independently of the task, serving only as a container for passing results between two threads.

Ii. What are the main capabilities expanded?

In this chapter, the functions of the two new netty interfaces are analyzed.

Function/Name jdk Future netty Future Promise
cancel Cancel the task
isCanceled Whether the task is canceled
isDone Task completion does not distinguish between success and failure
get Get task result, block wait
getNow Gets the result of the task, non-blocking, and returns NULL if the result has not yet been produced
await Wait until the task is complete. If the task fails, isSuccess is used instead of throwing an exception
sync Wait for the task to end and throw an exception if the task fails
isSuccess Determine whether the task succeeded
cause Gets a failure message, non-blocking, and returns NULL if there is no failure
addLinstener Add a callback to receive the result asynchronously
setSuccess Setting Result
setFailure Setting failure Result

Three, use examples

Let’s focus on Promise and see how it can be used in different scenarios.

Example 1 The synchronization task succeeds

public static void main(String[] args) throws ExecutionException, InterruptedException { DefaultEventLoop eventExecutors = new DefaultEventLoop(); DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors); eventExecutors.execute(()->{ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() +", set success: " + 10); promise.setSuccess(10); }); System.out.println(Thread.currentThread().getName() +", start..." ); Return null system.out.println (thread.currentThread ().getName() +", "+ promise.getNow()); System.out.println(thread.currentThread ().getName() +", "+ promise.get()); }Copy the code

Results:

main, start...
main, null
defaultEventLoop-1-1, set success: 10
main, 10
Copy the code

Example 2 The asynchronous processing task succeeds

public static void main(String[] args) throws ExecutionException, InterruptedException { DefaultEventLoop eventExecutors = new DefaultEventLoop(); DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors); eventExecutors.execute(()->{ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() +", set success: " + 10); promise.setSuccess(10); }); System.out.println(Thread.currentThread().getName() +", start..." ); // Add asynchronous listener, Promise.addlistener (Future ->{system.out.println (thread.currentThread ().getName() +", " + future.getNow()); }); }Copy the code

Results:

main, start...
defaultEventLoop-1-1, set success: 10
defaultEventLoop-1-1, 10
Copy the code

Example 3. Sync & get fails

Sync or GET, the difference being that GET wraps up the exception information in another layer

public static void main(String[] args) throws ExecutionException, InterruptedException { DefaultEventLoop eventExecutors = new DefaultEventLoop(); DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors); eventExecutors.execute(()->{ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() +", set failure"); promise.setFailure(new RuntimeException()); }); System.out.println(Thread.currentThread().getName() +", start..." ); Return null system.out.println (thread.currentThread ().getName() +", "+ promise.getNow()); System.out.println(thread.currentThread ().getName() +", "+ promise.get()); //System.out.println(Thread.currentThread().getName() +", " + promise.get()); }Copy the code

Results:

main, start...
main, null
defaultEventLoop-1-1, set failure
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException
	at io.netty.util.concurrent.DefaultPromise.get(DefaultPromise.java:349)
	at com.cloud.bssp.netty.promise.Test3.main(Test3.java:34)
Caused by: java.lang.RuntimeException
	at com.cloud.bssp.netty.promise.Test3.lambda$main$0(Test3.java:27)
	at io.netty.channel.DefaultEventLoop.run(DefaultEventLoop.java:54)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)
Copy the code

Example 4: Failed to await the synchronization task

public static void main(String[] args) throws ExecutionException, InterruptedException { DefaultEventLoop eventExecutors = new DefaultEventLoop(); DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors); eventExecutors.execute(()->{ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() +", set failure"); promise.setFailure(new RuntimeException()); }); System.out.println(Thread.currentThread().getName() +", start..." ); Return null system.out.println (thread.currentThread ().getName() +", "+ promise.getNow()); // block to await result promise.await(); System.out.println(thread.currentThread ().getName() +", "+ promise.isSuccess()); }Copy the code

Results:

main, start...
main, null
defaultEventLoop-1-1, set failure
main, false
Copy the code

Example 5 Failed to process tasks asynchronously

public static void main(String[] args) { DefaultEventLoop eventExecutors = new DefaultEventLoop(); DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors); eventExecutors.execute(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + ", set failure"); promise.setFailure(new RuntimeException()); }); System.out.println(Thread.currentThread().getName() + ", start..." ); AddListener (Future -> {if (! System.out.println(thread.currentThread ().getName() + ", "+ promise.cause()); } System.out.println(Thread.currentThread().getName() + ", " + future.getNow()); }); }Copy the code

Results:

main, start...
defaultEventLoop-1-1, set failure
defaultEventLoop-1-1, java.lang.RuntimeException
defaultEventLoop-1-1, null
Copy the code

Example 6 await deadlock check

public static void main(String[] args) { DefaultEventLoop eventExecutors = new DefaultEventLoop(); DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors); eventExecutors.submit(()->{ System.out.println("1"); try { promise.await(); / / pay attention to not only catch InterruptedException / / or deadlock inspection thrown BlockingOperationException will continue to spread up / / and submit task will packaged as PromiseTask, Its run method will catch all exceptions and then set it to a Promise failure without throwing} catch (Exception e) {e.prinintStackTrace (); } System.out.println("2"); }); eventExecutors.submit(()->{ System.out.println("3"); try { promise.await(); } catch (Exception e) { e.printStackTrace(); } System.out.println("4"); }); }Copy the code

Results:

1 2 3 4 io.netty.util.concurrent.BlockingOperationException: DefaultPromise@6a77ffc2(incomplete) at io.netty.util.concurrent.DefaultPromise.checkDeadLock(DefaultPromise.java:461) at  io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:246) at com.cloud.bssp.netty.promise.Test6.lambda$main$0(Test6.java:21) at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98) at io.netty.util.concurrent.PromiseTask.run(PromiseTask.java:106) at io.netty.channel.DefaultEventLoop.run(DefaultEventLoop.java:54) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:748) io.netty.util.concurrent.BlockingOperationException: DefaultPromise@6a77ffc2(incomplete) at io.netty.util.concurrent.DefaultPromise.checkDeadLock(DefaultPromise.java:461) at  io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:246) at com.cloud.bssp.netty.promise.Test6.lambda$main$1(Test6.java:33) at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98) at io.netty.util.concurrent.PromiseTask.run(PromiseTask.java:106) at io.netty.channel.DefaultEventLoop.run(DefaultEventLoop.java:54) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:748)Copy the code