“This is my fourth day of participating in the First Challenge 2022. For more details: First Challenge 2022.”

preface

There are only two ways to create threads: inheriting threads or implementing the Runnable interface. However, both methods have a defect that they do not return a value

After Java 1.5, you can retrieve a Future object containing the returned value by submitting a Callable to the thread pool

Limitations of the Future interface

When the Future thread performs a very time-consuming operation, our main thread blocks.

When we’re on simple business, we can use get(long,TimeUnit), another overloaded method of the Future, to set the timeout to prevent our main thread from being endlessly blocked.

Using the Future interface or FutureTask class alone is not a good way to do what we need

  • Combine two asynchronous computations into one that is independent of each other, while the second depends on the result of the first
  • Wait for all tasks in the Future collection to complete.
  • Just wait for the fastest finished task of the Future collection to complete and return its result.
  • Perform a Future task programmatically
  • We are notified when the Future’s completion time is complete and can use the Future’s calculation results for further operations, rather than simply blocking and waiting for the results of the operation

What is a CompletableFuture

In Java 8, a new class is added: CompletableFuture, which combines the advantages of Future and provides a very powerful extension of Future, which can help simplify the complexity of asynchronous programming, and provides the ability of functional programming, which can process the calculation results through callbacks

CompletableFuture is designed for asynchronous programming in Java. Instead of blocking/waiting for a task to complete, you can use the main thread to perform other tasks in parallel. Using this parallel approach greatly improves the performance of the program.

  • CompletableFuture implements the Future interface and therefore has the ability to return results asynchronously.
  • CompletableFuture implements the CompletionStage interface, which is a new Java8 interface for stage processing in asynchronous execution. It is used extensively in the calculation of Lambda expressions. Currently, CompletableFuture has only one implementation class.
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
Copy the code

Method naming rules

  • Methods with the Async suffix are executed asynchronously by another thread, not by the thread reusing the previous task

  • Methods with the Apply label can get a return value + return value

  • All methods with the Accept identifier get the return value

  • A method with the RUN identifier cannot get a return value or no return value, but simply run

Get method and Join method

  • join: blocks the result or throws an unchecked exception.
  • get: block to get result or throw checked exception, need to display for try… Catch processing

Used by different thread pools

Default thread pool execution

/** * Default thread pool * Run result: * main................. start..... * main................. end...... * Current thread: ForkJoinPool.com monpool-worker-9 *
@Test
public void defaultThread(a) {
    System.out.println("main................. start.....");
    CompletableFuture.runAsync(() -> {
        System.out.println("Current thread:" + Thread.currentThread().getName());
        int i = 10 / 2;
        System.out.println("Operating Results:" + i);
    });
    System.out.println("main................. end......");
}
Copy the code

By default, ForkJoinPool.commonPool() is used. CommonPool is a thread pool that can be shared by many tasks. CommonPool is designed to run non-blocking CPU-intensive tasks. The default thread count is CPU count -1

  • Where commonPool is used
    • CompletableFuture
    • Parallel Streams
  • Why commonPool
    • To avoid introducing a thread pool for any parallel operation, at worst, too many pool threads are created on a single JVM, reducing efficiency.
  • How is the commonPool thread pool created and used
    • A ForkJoinTask must run in a ForkJoinPool, and if it is not explicitly submitted to the ForkJoinPool, a common pool (full process sharing) is used to execute the task.

Custom thread pool execution

Customize a thread pool

private ThreadPoolExecutor executor = new ThreadPoolExecutor(10.10.0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<Runnable>());
Copy the code

Use defined thread pools

/** * Custom thread pool * Result: * main................. start..... * main................. end...... * Current thread: pool-1-thread-1 * Running result: 5 */
@Test
public void myThread(a) {
    System.out.println("main................. start.....");
    CompletableFuture.runAsync(() -> {
        System.out.println("Current thread:" + Thread.currentThread().getName());
        int i = 10 / 2;
        System.out.println("Operating Results:" + i);
    },executor);
    System.out.println("main................. end......");
}
Copy the code

Start an asynchronous

RunAsync – No return value

Start an asynchronous task thread with runAsync. This method returns no results and is suitable for asynchronous tasks that do not require results

/ * * * * has no return value * runAsync * results: * the main... start..... * main................. end...... * Current thread: 33 * Run result: 5 */
@Test
public void runAsync(a) {
    System.out.println("main................. start.....");
    CompletableFuture.runAsync(() -> {
        System.out.println("Current thread:" + Thread.currentThread().getId());
        int i = 10 / 2;
        System.out.println("Operating Results:" + i);
    }, executor);
    System.out.println("main................. end......");
}
Copy the code

SupplyAsync – Has a return value

Use the CompletableFuture.get () method to get the result, in which case the program blocks until the result is returned.

/ * * * returns a value * supplyAsync * results: * the main... start..... * Current thread: 33 * Run result: 5 * main................. end..... 5 * /
@Test
public void supplyAsync(a) throws ExecutionException, InterruptedException {
    System.out.println("main................. start.....");
    CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
        System.out.println("Current thread:" + Thread.currentThread().getId());
        int i = 10 / 2;
        System.out.println("Operating Results:" + i);
        return i;
    }, executor);
    System.out.println("main................. end....." + completableFuture.get());
}
Copy the code

If you want to timeout and have to proceed, use the completablefuture. get(long timeout, TimeUnit unit) method.

Thread serialization method

Methods with the Async suffix are executed asynchronously by another thread, not by the thread reusing the previous task

ThenApply – the above task executes after executing + get return value + have return value

Result: thenApplyAsync Current thread: 33 thenApplyAsync Running result: 5 thenApplyAsync task 2 started... Previous result: 5 * main................. end..... hello10 * *@throws ExecutionException
 * @throws InterruptedException
 */
@Test
public void thenApplyAsync(a) throws ExecutionException, InterruptedException {

    CompletableFuture<String> thenApplyAsync = CompletableFuture.supplyAsync(() -> {
        System.out.println("ThenApplyAsync Current thread:" + Thread.currentThread().getId());
        int i = 10 / 2;
        System.out.println(ThenApplyAsync result: + i);
        return i;
    }, executor).thenApplyAsync(result -> {
        System.out.println(ThenApplyAsync Task 2 started... Previous result:" + result);
        return "hello" + result * 2;
    }, executor);
    System.out.println("main................. end....." + thenApplyAsync.get());
}
Copy the code

ThenAccept – Execute + to get the return value

ThenAcceptAsync: 33 thenAcceptAsync: 5 thenAcceptAsync: 33 thenAcceptAsync: 5 thenAcceptAsync: 33 thenAcceptAsync: 5 thenAcceptAsync: 33 thenAcceptAsync Result: 5 *@throws ExecutionException
 * @throws InterruptedException
 */
@Test
public void thenAcceptAsync(a) throws ExecutionException, InterruptedException {
    CompletableFuture<Void> thenAcceptAsync = CompletableFuture.supplyAsync(() -> {
        System.out.println("ThenAcceptAsync Current thread:" + Thread.currentThread().getId());
        int i = 10 / 2;
        System.out.println("ThenAcceptAsync result: + i);
        return i;
    }, executor).thenAcceptAsync(result -> {
        System.out.println("ThenAcceptAsync task 2 started... Previous result:" + result);
    }, executor);
}
Copy the code

ThenRun – Finish executing the task above

/** * The preceding tasks are completed. * Result * main................. start..... * Current thread: 33 * Result: 5 * Task 2 started... * /
@Test
public void thenRunAsync(a) throws ExecutionException, InterruptedException {
    System.out.println("main................. start.....");
    CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> {
        System.out.println("Current thread:" + Thread.currentThread().getId());
        int i = 10 / 2;
        System.out.println("Operating Results:" + i);
        return i;
    }, executor).thenRunAsync(() -> {
        System.out.println("Mission 2 activated...");
    }, executor);
}
Copy the code

ThenCompose – receives the return value and generates a new task

A new task is returned when the original task is complete

  • ThenApply () converts the type in the generic, which is equivalent to converting the CompletableFuture to a new CompletableFuture
  • ThenCompose () is used to connect two CompletableFutures, generating a new CompletableFuture.
/** * when the original task is finished and receives the return value, return a new task * result: * hello: thenCompose */
@Test
public void thenCompose(a) {
    CompletableFuture cf = CompletableFuture.completedFuture("hello")
            .thenCompose(str -> CompletableFuture.supplyAsync(() -> {
                return str + ": thenCompose";
            },executor));
    System.out.println(cf.join());
}
Copy the code

Task combination

ThenCombine – consume two results + return results

/ * * * * completableFuture both tasks to complete. ThenCombine () to obtain two future returns results, results: returns a value * * task 1 thread: 33 * task 1 run results: 5 * task 2 threads: 34 * task 2 run results: * Task 5 startup... Results 1:5... Result 2: Hello * task 5 Result Hello ->5 */
@Test
public void thenCombine(a) throws ExecutionException, InterruptedException {
    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("Task 1 thread:" + Thread.currentThread().getId());
        int i = 10 / 2;
        System.out.println("Result of Task 1:" + i);
        return i;
    }, executor);

    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
        System.out.println("Task 2 thread:" + Thread.currentThread().getId());
        System.out.println("Result of Task 2:");
        return "hello";
    }, executor);
    CompletableFuture<String> thenCombineAsync = future1.thenCombineAsync(future2, (result1, result2) -> {
        System.out.println("Task 5 activated... Result 1:" + result1 + "... Result 2:" + result2);
        return result2 + "-- >" + result1;
    }, executor);
    System.out.println("Task 5 Results" + thenCombineAsync.get());
}
Copy the code

ThenAcceptBoth – consume two results + no return

/ * * * both tasks to complete. * completableFuture thenAcceptBoth () to obtain two future returns as a result, there is no return value * results: * task 1 thread: 33 * task 1 run results: 5 * task 2 threads: 34 * Result of Task 2: * Task 4 started... Results 1:5... Result 2: hello */
@Test
public void thenAcceptBothAsync(a) throws ExecutionException, InterruptedException {
    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("Task 1 thread:" + Thread.currentThread().getId());
        int i = 10 / 2;
        System.out.println("Result of Task 1:" + i);
        return i;
    }, executor);

    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
        System.out.println("Task 2 thread:" + Thread.currentThread().getId());
        System.out.println("Result of Task 2:");
        return "hello";
    }, executor);

    CompletableFuture<Void> thenAcceptBothAsync = future1.thenAcceptBothAsync(future2, (result1, result2) -> {
        System.out.println("Task 4 activated... Result 1:" + result1 + "... Result 2:" + result2);
    }, executor);

}
Copy the code

RunAfterBoth – Two tasks complete and then run

/ * * * both tasks to complete. * completableFuture runAfterBoth combination () two future * results: * task 1 thread: 33 * task 1 run results: 5 * task 2 threads: 34 * task 2 run results: * Task 3 startup... * /
@Test
public void runAfterBothAsync(a) {
    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("Task 1 thread:" + Thread.currentThread().getId());
        int i = 10 / 2;
        System.out.println("Result of Task 1:" + i);
        return i;
    }, executor);

    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
        System.out.println("Task 2 thread:" + Thread.currentThread().getId());
        System.out.println("Result of Task 2:");
        return "hello";
    }, executor);

    CompletableFuture<Void> runAfterBothAsync = future1.runAfterBothAsync(future2, () -> {
        System.out.println("Task 3 activated...");
    }, executor);

}
Copy the code

Execute one of the two tasks

ApplyToEither – one of them is done + get return value + have return value

/ * * * two tasks, a task executes * objectCompletableFuture applyToEither () is one of the execution of the execution + + has a return value to obtain the return value * results: * task 1 thread: 33 * task 2 threads: 34 * Result of Task 2: * Task 5 starts... Result: Hello world * 

* Process finished with exit code 0 */

@Test public void applyToEither(a) throws ExecutionException, InterruptedException { CompletableFuture<Object> future1 = CompletableFuture.supplyAsync(() -> { System.out.println("Task 1 thread:" + Thread.currentThread().getId()); int i = 10 / 2; try { Thread.sleep(3000); System.out.println("Result of Task 1:" + i); } catch (InterruptedException e) { e.printStackTrace(); } return i; }, executor); CompletableFuture<Object> future2 = CompletableFuture.supplyAsync(() -> { System.out.println("Task 2 thread:" + Thread.currentThread().getId()); System.out.println("Result of Task 2:"); return "hello"; }, executor); CompletableFuture<String> applyToEitherAsync = future1.applyToEitherAsync(future2, result -> { System.out.println("Task 5 commenced... Result:" + result); return result.toString() + " world"; }, executor); System.out.println("Task 5 Result:" + applyToEitherAsync.get()); } Copy the code

AcceptEither – Returns a value after one of the acceptEither + is executed

/ * * * two tasks, a task executes * objectCompletableFuture acceptEither () is one of the execution of the execution + results: to obtain the return value * * task 1 thread: 33 * task 2 threads: 34 * task 2 run results: * Task 4 starts executing... Result: hello */
@Test
public void acceptEither(a) {
    CompletableFuture<Object> future1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("Task 1 thread:" + Thread.currentThread().getId());
        int i = 10 / 2;
        try {
            Thread.sleep(3000);
            System.out.println("Result of Task 1:" + i);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return i;
    }, executor);

    CompletableFuture<Object> future2 = CompletableFuture.supplyAsync(() -> {
        System.out.println("Task 2 thread:" + Thread.currentThread().getId());
        System.out.println("Result of Task 2:");
        return "hello";
    }, executor);

    CompletableFuture<Void> acceptEitherAsync = future1.acceptEitherAsync(future2, result -> {
        System.out.println("Task 4 commenced... Result:" + result);
    }, executor);

}
Copy the code

RunAfterEither – Executed as soon as a task is complete

/ * * * two tasks, a task executes * < p > * objectCompletableFuture runAfterEither () is one of the execution of the execution * results: * task 1 thread: 33 * task 2 threads: 34 * task 2 run results: * Task 3 starts executing... * /
@Test
public void runAfterEither(a) {
    CompletableFuture<Object> future1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("Task 1 thread:" + Thread.currentThread().getId());
        int i = 10 / 2;
        try {
            Thread.sleep(3000);
            System.out.println("Result of Task 1:" + i);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return i;
    }, executor);

    CompletableFuture<Object> future2 = CompletableFuture.supplyAsync(() -> {
        System.out.println("Task 2 thread:" + Thread.currentThread().getId());
        System.out.println("Result of Task 2:");
        return "hello";
    }, executor);

    CompletableFuture<Void> runAfterEitherAsync = future1.runAfterEitherAsync(future2, () -> {
        System.out.println("Task 3 commenced...");
    }, executor);
}
Copy the code

Multi-task combination

AllOf – Wait until all is complete before executing

/** * Multi-task combination * allOf Wait until all tasks complete * Result: * Task 1 * Task 3 * Task 2 * allOf task 1------- Task 2------- task 3 **@throws ExecutionException
 * @throws InterruptedException
 */
@Test
public void allOf(a) throws ExecutionException, InterruptedException {
    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("Task 1");
        return "Task 1";
    }, executor);
    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
            System.out.println(Task 2 "");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return Task 2 "";
    }, executor);
    CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
        System.out.println("Task 3");
        return "Task 3";
    }, executor);

    CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2, future3);
    // Wait for all tasks to complete
    //allOf.get();
    allOf.join();
    System.out.println("allOf" + future1.get() + "-- -- -- -- -- -- --" + future2.get() + "-- -- -- -- -- -- --" + future3.get());

}
Copy the code

AnyOf – Executes after one of them is complete


/** * Multitask combinations * anyOf as long as one task is completed * Result: * Task 1 * anyOf-- Task 1 * task 3 * etc. Task 2 * Task 2 **@throws ExecutionException
 * @throws InterruptedException
 */
@Test
public void anyOf(a) throws ExecutionException, InterruptedException {
    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("Task 1");
        return "Task 1";
    }, executor);
    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
            System.out.println(Task 2 "");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return Task 2 "";
    }, executor);

    CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
        System.out.println("Task 3");
        return "Task 3";
    }, executor);
    CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future1, future2, future3);
    System.out.println("AnyOf -- the first thing to be done" + anyOf.get());
    // Wait for Future2 to print
    System.out.println("Wait task 2.");
    Thread.sleep(3000);
}
Copy the code

Abnormal perception

Handle – Catches a result or exception and returns a new result

The input parameter is a result or exception and returns a new result

/ * * * into or abnormal for results, return the new results: * * the main... start..... * Current thread: 33 * main................. end..... Return */ on an error
@Test
public void handle(a) throws ExecutionException, InterruptedException {
    System.out.println("main................. start.....");
    final CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
        System.out.println("Current thread:" + Thread.currentThread().getId());
        int i = 10 / 0;
        System.out.println("Operating Results:" + i);
        return i;
    }, executor).handleAsync((in, throwable) -> {
        if(throwable ! =null) {
            return "Error return";
        }
        return "Right.";
    });
    System.out.println("main................. end....." + completableFuture.get());

}
Copy the code

WhenComplete – Sense the result or exception and return the corresponding information

WhenComplete gets the exception message, but cannot modify the returned message

/ * * * the return value and subsequent operation whenComplete * < p > * results: * the main... start..... * Current thread: 33 * Asynchronous completion... Result: null... Exceptions are: java.util.concurrent.Com pletionException: Java. Lang. ArithmeticException: divide by zero 2 * * * an error@throws ExecutionException
 * @throws InterruptedException
 */
@Test
public void whenComplete(a) {
    System.out.println("main................. start.....");
    final CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
        System.out.println("Current thread:" + Thread.currentThread().getId());
        int i = 10 / 0;
        System.out.println("Operating Results:" + i);
        return i;
    }, executor).whenComplete((result, throwable) -> {
        //whenComplete gets the exception message, but cannot modify the return message
        System.out.println("Asynchronous completion... The result:" + result + "... The exception is:" + throwable);
    });

    try {
        System.out.println("main................. end.. T..." + completableFuture.get());
    } catch (InterruptedException e) {
        System.out.println("Error 1");
    } catch (ExecutionException e) {
        System.out.println("Error 2"); }}Copy the code

Exceptionally – Catches the exception and returns the specified value

/ method after the completion of the aware of * * * * error: exceptionally and returns the specified value * * the main... start..... * the current thread: 33 * performed exceptionally * main... end..... 0 *@throws ExecutionException
 * @throws InterruptedException
 */
@Test
public void exceptionally(a) throws ExecutionException, InterruptedException {
    System.out.println("main................. start.....");
    CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
        System.out.println("Current thread:" + Thread.currentThread().getId());
        int i = 10 / 0;
        System.out.println("Operating Results:" + i);
        return i;
    }, executor).exceptionally(throwable -> {
        //R apply(T t);
        //exceptionally detects errors and returns the specified value
        System.out.println("Exceptionally");
        return 0;
    });
    System.out.println("main................. end....." + completableFuture.get());
}
Copy the code

Write in the last

  • 👍🏻 : have harvest, praise encouragement!
  • ❤️ : Collect articles, easy to look back!
  • 💬 : Comment exchange, mutual progress!