When performing some time-consuming tasks, you want to perform multithreading calculations, and then asynchronously return each result for combined processing. In Java5, you can achieve this through Java thread pool and Future interface.
Introduction to the Future Api documentation
-
public interface Future<V> Copy the code
A Future represents the result of an asynchronous computation. Provides methods to check if the calculation is complete, wait for it to complete, and retrieve the results of the calculation. Results can only be retrieved using the method get when the calculation is complete, if necessary, until blocked. Cancellation is done by the cancel method. Additional methods are provided to determine whether a task completed properly or was cancelled. Once the calculation is complete, the calculation cannot be cancelled. If you want to use a Future for possible reasons, but do not provide a usable result, you can declare the form Future
There are only a few simple methods:
The return value | methods |
---|---|
boolean | cancel(boolean mayInterruptIfRunning) Attempted to cancel execution of this task. |
V |
get() Wait, if necessary, for the calculation to complete, and then retrieve its results. |
V |
get(long timeout, TimeUnit unit) If needed, complete in most of the given computation time and then retrieve its results, if available. |
boolean |
isCancelled() returntrue If this task is completed before cancelling normal. |
boolean |
isDone() returntrue If you do this task. |
Let me give you an example
Create two tasks
public class Task2 implements Callable<String> {
@Override
public String call(a) throws Exception {
/** * returns a string after calculation, which takes 3 seconds */
Thread.sleep(3000);
return "world"; }}public class Task1 implements Callable<String> {
@Override
public String call(a) throws Exception {
/** * returns a string after calculation, which takes 1 second */
Thread.sleep(1000);
return "hello"; }}Copy the code
Create a method to perform the above two tasks and merge the results
/** * merge the result **@return
* @throws ExecutionException
* @throws InterruptedException
*/
public static String test(a) throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
ExecutorService executorService = Executors.newFixedThreadPool(5);
/** ** sumbmit the Callable argument passed to sumbmit is also a functional interface, which can also be written as: * * Future
task1 = executorService.submit(() -> { * Thread.sleep(1000); * return "hello"; *}); * * /
Future<String> task1 = executorService.submit(new Task1());
Future<String> task2 = executorService.submit(new Task2());
/** * process other tasks, return result */
Thread.sleep(3000);
String s3 = ", I'm coming";
String s = task1.get();
String s1 = task2.get();
long end = System.currentTimeMillis();
System.out.println("Total time:" + (end - start) / 1000);
executorService.shutdown();
return s + "" + s1 + s3;
}
Copy the code
Executes the method and finds that the entire method takes three seconds. In the Future, two time-consuming operations have been triggered, freeing up the calling thread to do other valuable work instead of waiting for time-consuming operations to complete
public static void main(String[] args) throws ExecutionException, InterruptedException {
String result = test();
System.out.println(result);
// Total time: 3
// hello world, I'm coming
}
Copy the code
Limitations of the Future interface
While the Future interface provides methods to operate asynchronously, allowing calling threads to process other tasks and wait for tasks to complete to retrieve the results of interface calculations, these features are not sufficient to write clean code and meet everyday development needs, such as
-
For example, several asynchronous computations are merged into one, and the asynchronous computations have sequencing problems
-
Wait for all asynchronous computations to complete
-
Only handle tasks that are completed first (such as calling two identical third-party apis, which takes less time to call, reducing interface time)
CompletableFuture use
To address the limitations of the Future interface, let’s start with a few examples of how CompletableFuture makes up for these limitations
1. Sequencing
CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
/** * the calculation returns a string */
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
}).thenCombine(CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "world";
}), (x, y) -> {
return x + y;
});
System.out.println("s:"+stringCompletableFuture.join());
/** * s:helloworld */
Copy the code
2. Only deal with tasks that get done first
CompletableFuture<String> sCf = CompletableFuture.supplyAsync(() -> {
/** * the calculation returns a string */
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "a";
}).applyToEitherAsync(CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "b";
}), (x) -> {
return x;
});
System.out.println("s:"+sCf.join());
/** * s:a */Or CompletableFuture < String > sCf = CompletableFuture. SupplyAsync (() - > {/** * the calculation returns a string */
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "a";
});
CompletableFuture<String> sCf2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "b";
});
CompletableFuture<Object> objectCompletableFuture = CompletableFuture.anyOf(sCf, sCf2);
String join = (String) objectCompletableFuture.join();
Copy the code
3. Wait for all asynchronous computing tasks to complete
long start = System.currentTimeMillis();
CompletableFuture<String> sCf = CompletableFuture.supplyAsync(() -> {
/** * the calculation returns a string */
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "a";
});
CompletableFuture<String> sCf2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "b";
});
CompletableFuture<String> sCf3= CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "c";
});
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(sCf, sCf2,sCf3);
voidCompletableFuture.join();
long end = System.currentTimeMillis();
System.out.println("Total time:" + (end - start) / 1000);
System.out.println();
ArrayList<CompletableFuture<String>> cfList = new ArrayList<>();
cfList.add(sCf);
cfList.add(sCf2);
cfList.add(sCf3);
cfList.stream().map(CompletableFuture::join).forEach(System.out::println);
/** * Total time: 5 * A * B * C */
Copy the code
4. Cooperate with thread pool processing
CompletableFuture is implemented by default using a thread pool ForkJoin (the number of threads in the thread pool is equal to the number of computer cores -1. If the number of cores is too small in a production environment and the processing logic is too complex, the number of threads waiting to execute will slow down the server). So we use a custom thread pool implementation, for example
ExecutorService executorService = Executors.newFixedThreadPool(5);
CompletableFuture<String> sCf3= CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "c";
},executorService);
Copy the code
5. Exception handling
CompletableFuture provides many methods to meet the various needs of ordinary asynchronous computation. Some methods are suffixed with Async, indicating that the processing of the method is still handled by the thread pool. If not, it means that the calling thread will handle it. Also provides the exception processing to throw, get the thread specific exception prompt information
ExecutorService executorService = Executors.newFixedThreadPool(5);
CompletableFuture<String> sCf3 = CompletableFuture.supplyAsync(() -> {
int i = 1 / 0;
return "c";
}, executorService).exceptionally(e -> {
e.printStackTrace();
return"1";
});
String str = sCf3.join();
System.out.println("str:"+str);
executorService.shutdown();
Copy the code