Asynchronous programming introduced in java8
use
1. Enable asynchronous
SupplyAsync (return value)
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(()-> {
System.out.println("Start running asynchronously");
return "Asynchronous run complete";
});
future.join(); // Wait for the asynchronous task to complete and get the result
}
Copy the code
RunAsync (no return value)
public static void main(String[] args) {
CompletableFuture<Void> future = CompletableFuture.runAsync(()-> {
System.out.println("Start running asynchronously");
});
future.join(); // Wait for the asynchronous task to complete
}
Copy the code
By default, the asynchronous daemon thread is enabled. If the main thread ends first without calling the asynchronous result, the asynchronous task will end without completing it.
When an asynchronous result is called, the main thread waits for the asynchronous task to complete and then gets the result before continuing.
Connect two asynchronous tasks
thenCompose
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(()-> {
System.out.println("Start running asynchronously 1");
return "Asynchronous run complete 1";
}).thenCompose(result -> CompletableFuture.supplyAsync(()->{
System.out.println("Asynchronous task 1 Result:" + result);
System.out.println("Start running asynchronously 2");
return "Asynchronous run complete 2";
}));
future.join(); // get asynchronous result (get result 2)
}
Copy the code
Third, do the post-processing of tasks
ThenApply (return value)
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(()-> {
System.out.println("Start running asynchronously 1");
return "Asynchronous run complete 1";
}).thenApply(result -> {
System.out.println("Asynchronous task 1 Result:" + result);
System.out.println("Start running asynchronously 2");
return "Asynchronous run complete 2";
));
future.join(); // get asynchronous result (get result 2)
}
Copy the code
ThenAccept (no return value)
public static void main(String[] args) {
CompletableFuture<Void> future = CompletableFuture.supplyAsync(()-> {
System.out.println("Start running asynchronously 1");
return "Asynchronous run complete 1";
}).thenAccept(result -> {
System.out.println("Asynchronous task 1 Result:" + result);
System.out.println("Start running asynchronously 2"); ); future.join();// Wait for the asynchronous task to complete
}
Copy the code
ThenRun (no entry, no return value)
public static void main(String[] args) {
CompletableFuture<Void> future = CompletableFuture.supplyAsync(()-> {
System.out.println("Start running asynchronously 1");
return "Asynchronous run complete 1";
}).thenRun(() -> {
System.out.println("Start running asynchronously 2"); ); future.join();// Wait for the asynchronous task to complete
}
Copy the code
Four, combination processing
ThenCombine (has a return value)
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(()-> {
System.out.println("Start running asynchronously 1");
return "Asynchronous run complete 1";
}).thenCombine(CompletableFuture.supplyAsync(()->{
System.out.println("Start running asynchronously 2");
return "Asynchronous run complete 2";
}), (x, y) -> {
System.out.println("Asynchronous task 1 Result:" + x);
System.out.println("Asynchronous task 2 Result:" + y);
return "Return final result";
});
future.join(); // get the asynchronous result (get the final result)
}
Copy the code
Tasks 1 and 2 were run on separate threads.
ThenAcceptBoth (no return value)
public static void main(String[] args) {
CompletableFuture<Void> future = CompletableFuture.supplyAsync(()-> {
System.out.println("Start running asynchronously 1");
return "Asynchronous run complete 1";
}).thenAcceptBoth(CompletableFuture.supplyAsync(()->{
System.out.println("Start running asynchronously 2");
return "Asynchronous run complete 2";
}), (x, y) -> {
System.out.println("Asynchronous task 1 Result:" + x);
System.out.println("Asynchronous task 2 Result:" + y);
});
future.join(); // Wait for the asynchronous task to complete
}
Copy the code
RunAfterBoth (no input, no return value)
public static void main(String[] args) {
CompletableFuture<Void> future = CompletableFuture.supplyAsync(()-> {
System.out.println("Start running asynchronously 1");
return "Asynchronous run complete 1";
}).runAfterBoth(CompletableFuture.supplyAsync(()->{
System.out.println("Start running asynchronously 2");
return "Asynchronous run complete 2";
}), () -> {
System.out.println("Start running asynchronously 3");
});
future.join(); // Wait for the asynchronous task to complete
}
Copy the code
Five, priority processing
ApplyToEither (has a return value)
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(()-> {
System.out.println("Start asynchronously run 1, take 3s");
return "Asynchronous run complete 1";
}).applyToEither(CompletableFuture.supplyAsync(()->{
System.out.println("Start asynchronous run 2, it takes 2s");
return "Asynchronous run complete 2";
}), result -> result);
future.join(); // Get asynchronous results (first processed results)
}
Copy the code
AcceptEither (no return value)
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(()-> {
System.out.println("Start asynchronously run 1, take 3s");
return "Asynchronous run complete 1";
}).acceptEither(CompletableFuture.supplyAsync(()->{
System.out.println("Start asynchronous run 2, it takes 2s");
return "Asynchronous run complete 2";
}), result -> {
System.out.println("Processing result:" + result);
});
future.join(); // Wait for the asynchronous task to complete
}
Copy the code
RunAfterEither (no entry, no return value)
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(()-> {
System.out.println("Start asynchronously run 1, take 3s");
return "Asynchronous run complete 1";
}).runAfterEither(CompletableFuture.supplyAsync(()->{
System.out.println("Start asynchronous run 2, it takes 2s");
return "Asynchronous run complete 2";
}), () -> {
System.out.println("Subsequent Operations");
});
future.join(); // Wait for the asynchronous task to complete
}
Copy the code
After the first asynchronous task completes, the wait stops and the main thread continues.
Slow asynchronous tasks continue to complete if the main thread has not completed.
Because the asynchronous task uses a daemon thread, the main thread has finished execution, and the asynchronous task has not finished execution, because the entire program is finished.
Six, batch processing
allOf
public static void main(String[] args) throws Exception {
List<CompletableFuture<Void>> completableFutures = IntStream.range(0.10)
.mapToObj(index -> CompletableFuture.runAsync(() -> {
System.out.println(index + "do something ...");
}, executorService))
.collect(Collectors.toList());
CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0])).join(); // Wait for all asynchronous tasks to complete
}
Copy the code
Launched 10 asynchronous thread to handle tasks, through CompletableFuture. AllOf (completableFutures. ToArray (new CompletableFuture [0])). The join (); Wait for all asynchronous tasks to complete before continuing.
To get the result of an asynchronous task:
public static void main(String[] args) throws Exception {
List<CompletableFuture<String>> completableFutures = IntStream.range(0.10)
.mapToObj(index -> CompletableFuture.supplyAsync(() -> {
System.out.println(index + "do something ...");
return index + "Asynchronous task complete";
}, executorService))
.collect(Collectors.toList());
CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0]))
.thenRun(() -> {
completableFutures.stream().map(CompletableFuture::join).forEach(System.out::println);
}); // Wait for all the tasks to complete, then iterate to get all the results.
}
Copy the code
anyOf
public static void main(String[] args) throws Exception {
List<CompletableFuture<Void>> completableFutures = IntStream.range(0.10)
.mapToObj(index -> CompletableFuture.runAsync(() -> {
System.out.println(index + "do something ...");
}, executorService))
.collect(Collectors.toList());
CompletableFuture.anyOf(completableFutures.toArray(new CompletableFuture[0])).join(); // Wait for the first asynchronous task to complete and get the result
}
Copy the code
Multiple asynchronous tasks are executed together, waiting for the first task to complete and retrieving the results.
The main program will execute immediately after the result of the first task, and other asynchronous tasks will not stop and continue to finish. (If the main thread has not completed execution)
7. Exception handling
Exceptionally (only exceptions are accepted and backup is returned)
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(()-> {
System.out.println("Start asynchronously run 1, take 3s");
return "Asynchronous run complete 1";
}).applyToEither(CompletableFuture.supplyAsync(()->{
System.out.println("Start asynchronous run 2, it takes 2s");
return "Asynchronous run complete 2";
}), result -> result).exceptionally(e -> {
System.out.println("Exception Handling");
return "Plan B";
});
future.join(); // get asynchronous result (exception, return alternative)
}
Copy the code
Handle (Receives exceptions and results and returns the final solution)
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(()-> {
System.out.println("Start asynchronously run 1, take 3s");
return "Asynchronous run complete 1";
}).applyToEither(CompletableFuture.supplyAsync(()->{
System.out.println("Start asynchronous run 2, it takes 2s");
return "Asynchronous run complete 2";
}), result -> result).handle((e, s) -> {
if(e ! =null) {
System.out.println("Exception Handling :" + e.getMessage());
}
System.out.println("No abnormality, result :" + s);
return "Final plan";
});
future.join(); // Get asynchronous result (return final scheme)
}
Copy the code
WhenComplete (Receive exception and result, no return value)
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(()-> {
System.out.println("Start asynchronously run 1, take 3s");
return "Asynchronous run complete 1";
}).applyToEither(CompletableFuture.supplyAsync(()->{
System.out.println("Start asynchronous run 2, it takes 2s");
return "Asynchronous run complete 2";
}), result -> result).whenComplete((s, e) -> {
if(e ! =null) {
System.out.println("Exception Handling :" + e.getMessage());
}
System.out.println("No abnormality, result :" + s);
});
future.join(); // Get asynchronous results (whenComplete does not affect asynchronous results)
}
Copy the code
XXX and xxxAsync (e: thenApply() and thenApplyAsync())
xxx
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(()-> {
System.out.println("Start running asynchronously 1");
return "Asynchronous run complete 1";
}).thenApply(result -> {
System.out.println("Asynchronous task 1 Result:" + result);
System.out.println("Start running asynchronously 2");
return "Asynchronous run complete 2";
));
future.join(); // get asynchronous result (get result 2)
}
Copy the code
Start another thread to complete asynchronous task 1 and then asynchronous task 2.
xxxAsync
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(()-> {
System.out.println("Start running asynchronously 1");
return "Asynchronous run complete 1";
}).thenApplyAsync(result -> {
System.out.println("Asynchronous task 1 Result:" + result);
System.out.println("Start running asynchronously 2");
return "Asynchronous run complete 2";
));
future.join(); // get asynchronous result (get result 2)
}
Copy the code
Start another thread to complete asynchronous task 1, and start another thread to complete asynchronous task 2
Thread pools
Default thread pool
As I said earlier, threads are daemons, and here’s why. (Plenty of source code appears below)
AsyncPool Default thread pool
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
Copy the code
The creation of the default thread pool depends on the useCommonPool parameter
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
Copy the code
The useCommonPool parameter in turn depends on the ForkJoinPool commonPoolParallelism value
private static final boolean useCommonPool =
(ForkJoinPool.getCommonPoolParallelism() > 1);
Copy the code
Java. Util. Concurrent. ForkJoinPool class for a static method
static {
// initialize field offsets for CAS etc. common = java.security.AccessController.doPrivileged (new java.security.PrivilegedAction<ForkJoinPool>() {
public ForkJoinPool run(a) { return makeCommonPool(); }});
int par = common.config & SMASK; // report 1 even if threads disabled
commonParallelism = par > 0 ? par : 1;
}
Copy the code
The value of commonParallelism is related to common.config
See makeCommonPool() for what common is
private static ForkJoinPool makeCommonPool(a) {
final ForkJoinWorkerThreadFactory commonPoolForkJoinWorkerThreadFactory =
new CommonPoolForkJoinWorkerThreadFactory();
int parallelism = -1;
ForkJoinWorkerThreadFactory factory = null;
UncaughtExceptionHandler handler = null;
try { // ignore exceptions in accessing/parsing properties
String pp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.parallelism");
String fp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.threadFactory");
String hp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
if(pp ! =null)
parallelism = Integer.parseInt(pp);
if(fp ! =null)
factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
getSystemClassLoader().loadClass(fp).newInstance());
if(hp ! =null)
handler = ((UncaughtExceptionHandler)ClassLoader.
getSystemClassLoader().loadClass(hp).newInstance());
} catch (Exception ignore) {
}
if (factory == null) {
if (System.getSecurityManager() == null)
factory = commonPoolForkJoinWorkerThreadFactory;
else // use security-managed default
factory = new InnocuousForkJoinWorkerThreadFactory();
}
if (parallelism < 0 && // default 1 less than #cores
(parallelism = Runtime.getRuntime().availableProcessors() - 1) < =0)
parallelism = 1;
if (parallelism > MAX_CAP)
parallelism = MAX_CAP;
return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
"ForkJoinPool.commonPool-worker-");
}
Copy the code
The values of Parallelism have two effects
System configuration java.util.concurrent.ForkJoinPool.com mon. Parallelism
Number of CPU cores
String pp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.parallelism");
if(pp ! =null)
parallelism = Integer.parseInt(pp);
parallelism = Runtime.getRuntime().availableProcessors() - 1
Copy the code
AsyncPool final value
Parallelism is generally equal to the number of CPU cores on a computer -1
AsyncPool = ForkJoinPool.com monPool ();
ForkJoinPool.commonPool()
public static ForkJoinPool commonPool(a) {
// assert common ! = null : "static init error";
return common;
}
Copy the code
The definition of common is shown above
The last object returned is an object like this
return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
"ForkJoinPool.commonPool-worker-");
Copy the code
Where factory is to create the thread factory, his related definition
String fp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.threadFactory");
if(fp ! =null)
factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
getSystemClassLoader().loadClass(fp).newInstance());
//-----------------------------------------------------------------------------------
if (factory == null) {
if (System.getSecurityManager() == null)
factory = defaultForkJoinWorkerThreadFactory;
else // use security-managed default
factory = new InnocuousForkJoinWorkerThreadFactory();
}
//-----------------------------------------------------------------------------------
defaultForkJoinWorkerThreadFactory =
new DefaultForkJoinWorkerThreadFactory();
Copy the code
By default, the factory is DefaultForkJoinWorkerThreadFactory
DefaultForkJoinWorkerThreadFactory
static final class DefaultForkJoinWorkerThreadFactory
implements ForkJoinWorkerThreadFactory {
public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
return newForkJoinWorkerThread(pool); }}Copy the code
ForkJoinWorkerThread(ForkJoinPool pool)
protected ForkJoinWorkerThread(ForkJoinPool pool) {
// Use a placeholder until a useful name can be set in registerWorker
super("aForkJoinWorkerThread");
this.pool = pool;
this.workQueue = pool.registerWorker(this);
}
Copy the code
registerWorker(this)
final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
UncaughtExceptionHandler handler;
wt.setDaemon(true); // configure thread
if((handler = ueh) ! =null)
wt.setUncaughtExceptionHandler(handler);
WorkQueue w = new WorkQueue(this, wt);
int i = 0; // assign a pool index
int mode = config & MODE_MASK;
int rs = lockRunState();
try {
WorkQueue[] ws; int n; // skip if no array
if((ws = workQueues) ! =null && (n = ws.length) > 0) {
int s = indexSeed += SEED_INCREMENT; // unlikely to collide
int m = n - 1;
i = ((s << 1) | 1) & m; // odd-numbered indices
if(ws[i] ! =null) { // collision
int probes = 0; // step by approx half n
int step = (n <= 4)?2 : ((n >>> 1) & EVENMASK) + 2;
while(ws[i = (i + step) & m] ! =null) {
if (++probes >= n) {
workQueues = ws = Arrays.copyOf(ws, n <<= 1);
m = n - 1;
probes = 0;
}
}
}
w.hint = s; // use as random seed
w.config = i | mode;
w.scanState = i; // publication fencews[i] = w; }}finally {
unlockRunState(rs, rs & ~RSLOCK);
}
wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
return w;
}
Copy the code
The above DefaultForkJoinWorkerThreadFactory newThread method returns a new thread
When a new thread is created, its constructor is pool.registerworker (this); The method is passed in to the thread itself
See wt.setdaemon (true) in this method; Set this thread to a daemon thread (wt is the thread itself)
Custom thread pools
Some configuration of the default thread pool can be changed by setting system parameters, but it is not recommended. Because such modification affects the entire program, it is not easy to control.
The CompletableFuture method has an argument that lets us use a custom thread pool.
If you look at it, you will see that xxxAsync() methods occur in pairs and are overloaded, e:
runAsync(Runnable runnable);
runAsync(Runnable runnable, Executor executor);
Copy the code
The Executor argument can be passed to our custom thread pool
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("do something ......");
}, executorService);
future.join();
executorService.shutdown();
}
Copy the code
When using the thread pool, close it. The default thread created in the thread pool is not a daemon thread. If you do not close the thread pool, your program will not end.
New ThreadPoolExecutor() is recommended for custom thread pools; Instead of directly using the tools provided by the four thread pools, they have their own defects, detailed information can be viewed thread pool related information, according to their own business needs, to create a suitable thread pool.