Asynchronous programming introduced in java8

use

1. Enable asynchronous

SupplyAsync (return value)

public static void main(String[] args) {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(()-> {
    	System.out.println("Start running asynchronously");
    	return "Asynchronous run complete";
    });

    future.join();  // Wait for the asynchronous task to complete and get the result
}
Copy the code

RunAsync (no return value)

public static void main(String[] args) {
    CompletableFuture<Void> future = CompletableFuture.runAsync(()-> {
    	System.out.println("Start running asynchronously");
    });

    future.join();  // Wait for the asynchronous task to complete
}
Copy the code

By default, the asynchronous daemon thread is enabled. If the main thread ends first without calling the asynchronous result, the asynchronous task will end without completing it.

When an asynchronous result is called, the main thread waits for the asynchronous task to complete and then gets the result before continuing.

Connect two asynchronous tasks

thenCompose

public static void main(String[] args) {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(()-> {
    	System.out.println("Start running asynchronously 1");
    	return "Asynchronous run complete 1";
    }).thenCompose(result -> CompletableFuture.supplyAsync(()->{
        System.out.println("Asynchronous task 1 Result:" + result);
        System.out.println("Start running asynchronously 2");
    	return "Asynchronous run complete 2";
    }));

    future.join();  // get asynchronous result (get result 2)
}
Copy the code

Third, do the post-processing of tasks

ThenApply (return value)

public static void main(String[] args) {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(()-> {
    	System.out.println("Start running asynchronously 1");
    	return "Asynchronous run complete 1";
    }).thenApply(result -> {
        System.out.println("Asynchronous task 1 Result:" + result);
        System.out.println("Start running asynchronously 2");
    	return "Asynchronous run complete 2";
    ));

    future.join();  // get asynchronous result (get result 2)
}
Copy the code

ThenAccept (no return value)

public static void main(String[] args) {
    CompletableFuture<Void> future = CompletableFuture.supplyAsync(()-> {
    	System.out.println("Start running asynchronously 1");
    	return "Asynchronous run complete 1";
    }).thenAccept(result -> {
        System.out.println("Asynchronous task 1 Result:" + result);
        System.out.println("Start running asynchronously 2"); ); future.join();// Wait for the asynchronous task to complete
}
Copy the code

ThenRun (no entry, no return value)

public static void main(String[] args) {
    CompletableFuture<Void> future = CompletableFuture.supplyAsync(()-> {
    	System.out.println("Start running asynchronously 1");
    	return "Asynchronous run complete 1";
    }).thenRun(() -> {
        System.out.println("Start running asynchronously 2"); ); future.join();// Wait for the asynchronous task to complete
}
Copy the code

Four, combination processing

ThenCombine (has a return value)

public static void main(String[] args) {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(()-> {
    	System.out.println("Start running asynchronously 1");
    	return "Asynchronous run complete 1";
    }).thenCombine(CompletableFuture.supplyAsync(()->{
        System.out.println("Start running asynchronously 2");
    	return "Asynchronous run complete 2";
    }), (x, y) -> {
        System.out.println("Asynchronous task 1 Result:" + x);
        System.out.println("Asynchronous task 2 Result:" + y);
        return "Return final result";
    });

    future.join();  // get the asynchronous result (get the final result)
}
Copy the code

Tasks 1 and 2 were run on separate threads.

ThenAcceptBoth (no return value)

public static void main(String[] args) {
    CompletableFuture<Void> future = CompletableFuture.supplyAsync(()-> {
    	System.out.println("Start running asynchronously 1");
    	return "Asynchronous run complete 1";
    }).thenAcceptBoth(CompletableFuture.supplyAsync(()->{
        System.out.println("Start running asynchronously 2");
    	return "Asynchronous run complete 2";
    }), (x, y) -> {
        System.out.println("Asynchronous task 1 Result:" + x);
        System.out.println("Asynchronous task 2 Result:" + y);
    });

    future.join();  // Wait for the asynchronous task to complete
}
Copy the code

RunAfterBoth (no input, no return value)

public static void main(String[] args) {
    CompletableFuture<Void> future = CompletableFuture.supplyAsync(()-> {
    	System.out.println("Start running asynchronously 1");
    	return "Asynchronous run complete 1";
    }).runAfterBoth(CompletableFuture.supplyAsync(()->{
        System.out.println("Start running asynchronously 2");
    	return "Asynchronous run complete 2";
    }), () -> {
        System.out.println("Start running asynchronously 3");
    });

    future.join();  // Wait for the asynchronous task to complete
}
Copy the code

Five, priority processing

ApplyToEither (has a return value)

public static void main(String[] args) {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(()-> {
    	System.out.println("Start asynchronously run 1, take 3s");
    	return "Asynchronous run complete 1";
    }).applyToEither(CompletableFuture.supplyAsync(()->{
        System.out.println("Start asynchronous run 2, it takes 2s");
    	return "Asynchronous run complete 2";
    }), result -> result);

    future.join();  // Get asynchronous results (first processed results)
}
Copy the code

AcceptEither (no return value)

public static void main(String[] args) {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(()-> {
    	System.out.println("Start asynchronously run 1, take 3s");
    	return "Asynchronous run complete 1";
    }).acceptEither(CompletableFuture.supplyAsync(()->{
        System.out.println("Start asynchronous run 2, it takes 2s");
    	return "Asynchronous run complete 2";
    }), result -> {
        System.out.println("Processing result:" + result);
    });

    future.join();  // Wait for the asynchronous task to complete
}
Copy the code

RunAfterEither (no entry, no return value)

public static void main(String[] args) {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(()-> {
    	System.out.println("Start asynchronously run 1, take 3s");
    	return "Asynchronous run complete 1";
    }).runAfterEither(CompletableFuture.supplyAsync(()->{
        System.out.println("Start asynchronous run 2, it takes 2s");
    	return "Asynchronous run complete 2";
    }), () -> {
        System.out.println("Subsequent Operations");
    });

    future.join();  // Wait for the asynchronous task to complete
}
Copy the code

After the first asynchronous task completes, the wait stops and the main thread continues.

Slow asynchronous tasks continue to complete if the main thread has not completed.

Because the asynchronous task uses a daemon thread, the main thread has finished execution, and the asynchronous task has not finished execution, because the entire program is finished.

Six, batch processing

allOf

public static void main(String[] args) throws Exception {
    List<CompletableFuture<Void>> completableFutures = IntStream.range(0.10)
        .mapToObj(index -> CompletableFuture.runAsync(() -> {
            System.out.println(index + "do something ...");
        }, executorService))
        .collect(Collectors.toList());

    CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0])).join(); // Wait for all asynchronous tasks to complete
}
Copy the code

Launched 10 asynchronous thread to handle tasks, through CompletableFuture. AllOf (completableFutures. ToArray (new CompletableFuture [0])). The join (); Wait for all asynchronous tasks to complete before continuing.

To get the result of an asynchronous task:

public static void main(String[] args) throws Exception {
    List<CompletableFuture<String>> completableFutures = IntStream.range(0.10)
        .mapToObj(index -> CompletableFuture.supplyAsync(() -> {
            System.out.println(index + "do something ...");
            return index + "Asynchronous task complete";
        }, executorService))
        .collect(Collectors.toList());

    CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0]))
        .thenRun(() -> {
             completableFutures.stream().map(CompletableFuture::join).forEach(System.out::println);
        });  // Wait for all the tasks to complete, then iterate to get all the results.
}
Copy the code

anyOf

public static void main(String[] args) throws Exception {
    List<CompletableFuture<Void>> completableFutures = IntStream.range(0.10)
        .mapToObj(index -> CompletableFuture.runAsync(() -> {
            System.out.println(index + "do something ...");
        }, executorService))
        .collect(Collectors.toList());

    CompletableFuture.anyOf(completableFutures.toArray(new CompletableFuture[0])).join(); // Wait for the first asynchronous task to complete and get the result
}
Copy the code

Multiple asynchronous tasks are executed together, waiting for the first task to complete and retrieving the results.

The main program will execute immediately after the result of the first task, and other asynchronous tasks will not stop and continue to finish. (If the main thread has not completed execution)

7. Exception handling

Exceptionally (only exceptions are accepted and backup is returned)

public static void main(String[] args) {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(()-> {
    	System.out.println("Start asynchronously run 1, take 3s");
    	return "Asynchronous run complete 1";
    }).applyToEither(CompletableFuture.supplyAsync(()->{
        System.out.println("Start asynchronous run 2, it takes 2s");
    	return "Asynchronous run complete 2";
    }), result -> result).exceptionally(e -> {
        System.out.println("Exception Handling");
        return "Plan B";
    });

    future.join();  // get asynchronous result (exception, return alternative)
}
Copy the code

Handle (Receives exceptions and results and returns the final solution)

public static void main(String[] args) {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(()-> {
    	System.out.println("Start asynchronously run 1, take 3s");
    	return "Asynchronous run complete 1";
    }).applyToEither(CompletableFuture.supplyAsync(()->{
        System.out.println("Start asynchronous run 2, it takes 2s");
    	return "Asynchronous run complete 2";
    }), result -> result).handle((e, s) -> {
        if(e ! =null) {
            System.out.println("Exception Handling :" + e.getMessage());
        }
        System.out.println("No abnormality, result :" + s);
        return "Final plan";
    });

    future.join();  // Get asynchronous result (return final scheme)
}
Copy the code

WhenComplete (Receive exception and result, no return value)

public static void main(String[] args) {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(()-> {
    	System.out.println("Start asynchronously run 1, take 3s");
    	return "Asynchronous run complete 1";
    }).applyToEither(CompletableFuture.supplyAsync(()->{
        System.out.println("Start asynchronous run 2, it takes 2s");
    	return "Asynchronous run complete 2";
    }), result -> result).whenComplete((s, e) -> {
        if(e ! =null) {
            System.out.println("Exception Handling :" + e.getMessage());
        }
        System.out.println("No abnormality, result :" + s);
    });

    future.join();  // Get asynchronous results (whenComplete does not affect asynchronous results)
}
Copy the code

XXX and xxxAsync (e: thenApply() and thenApplyAsync())

xxx

public static void main(String[] args) {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(()-> {
    	System.out.println("Start running asynchronously 1");
    	return "Asynchronous run complete 1";
    }).thenApply(result -> {
        System.out.println("Asynchronous task 1 Result:" + result);
        System.out.println("Start running asynchronously 2");
    	return "Asynchronous run complete 2";
    ));

    future.join();  // get asynchronous result (get result 2)
}
Copy the code

Start another thread to complete asynchronous task 1 and then asynchronous task 2.

xxxAsync

public static void main(String[] args) {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(()-> {
    	System.out.println("Start running asynchronously 1");
    	return "Asynchronous run complete 1";
    }).thenApplyAsync(result -> {
        System.out.println("Asynchronous task 1 Result:" + result);
        System.out.println("Start running asynchronously 2");
    	return "Asynchronous run complete 2";
    ));

    future.join();  // get asynchronous result (get result 2)
}
Copy the code

Start another thread to complete asynchronous task 1, and start another thread to complete asynchronous task 2

Thread pools

Default thread pool

As I said earlier, threads are daemons, and here’s why. (Plenty of source code appears below)

AsyncPool Default thread pool

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
    return asyncSupplyStage(asyncPool, supplier);
}
Copy the code

The creation of the default thread pool depends on the useCommonPool parameter

private static final Executor asyncPool = useCommonPool ? 
    ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
Copy the code

The useCommonPool parameter in turn depends on the ForkJoinPool commonPoolParallelism value

private static final boolean useCommonPool =
    (ForkJoinPool.getCommonPoolParallelism() > 1);
Copy the code

Java. Util. Concurrent. ForkJoinPool class for a static method

static {
    // initialize field offsets for CAS etc. common = java.security.AccessController.doPrivileged (new java.security.PrivilegedAction<ForkJoinPool>() {
            public ForkJoinPool run(a) { return makeCommonPool(); }});
    int par = common.config & SMASK; // report 1 even if threads disabled
    commonParallelism = par > 0 ? par : 1;
}
Copy the code

The value of commonParallelism is related to common.config

See makeCommonPool() for what common is

private static ForkJoinPool makeCommonPool(a) {

    final ForkJoinWorkerThreadFactory commonPoolForkJoinWorkerThreadFactory =
        new CommonPoolForkJoinWorkerThreadFactory();
    int parallelism = -1;
    ForkJoinWorkerThreadFactory factory = null;
    UncaughtExceptionHandler handler = null;
    try {  // ignore exceptions in accessing/parsing properties
        String pp = System.getProperty
            ("java.util.concurrent.ForkJoinPool.common.parallelism");
        String fp = System.getProperty
            ("java.util.concurrent.ForkJoinPool.common.threadFactory");
        String hp = System.getProperty
            ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
        if(pp ! =null)
            parallelism = Integer.parseInt(pp);
        if(fp ! =null)
            factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
                       getSystemClassLoader().loadClass(fp).newInstance());
        if(hp ! =null)
            handler = ((UncaughtExceptionHandler)ClassLoader.
                       getSystemClassLoader().loadClass(hp).newInstance());
    } catch (Exception ignore) {
    }
    if (factory == null) {
        if (System.getSecurityManager() == null)
            factory = commonPoolForkJoinWorkerThreadFactory;
        else // use security-managed default
            factory = new InnocuousForkJoinWorkerThreadFactory();
    }
    if (parallelism < 0 && // default 1 less than #cores
        (parallelism = Runtime.getRuntime().availableProcessors() - 1) < =0)
        parallelism = 1;
    if (parallelism > MAX_CAP)
        parallelism = MAX_CAP;
    return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
                            "ForkJoinPool.commonPool-worker-");
}
Copy the code

The values of Parallelism have two effects

System configuration java.util.concurrent.ForkJoinPool.com mon. Parallelism

Number of CPU cores

String pp = System.getProperty
    ("java.util.concurrent.ForkJoinPool.common.parallelism");
if(pp ! =null)
    parallelism = Integer.parseInt(pp);

parallelism = Runtime.getRuntime().availableProcessors() - 1
Copy the code

AsyncPool final value

Parallelism is generally equal to the number of CPU cores on a computer -1

AsyncPool = ForkJoinPool.com monPool ();

ForkJoinPool.commonPool()

public static ForkJoinPool commonPool(a) {
    // assert common ! = null : "static init error";
    return common;
}
Copy the code

The definition of common is shown above

The last object returned is an object like this

return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
                            "ForkJoinPool.commonPool-worker-");
Copy the code

Where factory is to create the thread factory, his related definition

String fp = System.getProperty
	("java.util.concurrent.ForkJoinPool.common.threadFactory");
if(fp ! =null)
    factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
               getSystemClassLoader().loadClass(fp).newInstance());
//-----------------------------------------------------------------------------------
if (factory == null) {
    if (System.getSecurityManager() == null)
        factory = defaultForkJoinWorkerThreadFactory;
    else // use security-managed default
        factory = new InnocuousForkJoinWorkerThreadFactory();
}
//-----------------------------------------------------------------------------------
defaultForkJoinWorkerThreadFactory =
    new DefaultForkJoinWorkerThreadFactory();
Copy the code

By default, the factory is DefaultForkJoinWorkerThreadFactory

DefaultForkJoinWorkerThreadFactory

static final class DefaultForkJoinWorkerThreadFactory
    implements ForkJoinWorkerThreadFactory {
    public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
        return newForkJoinWorkerThread(pool); }}Copy the code

ForkJoinWorkerThread(ForkJoinPool pool)

protected ForkJoinWorkerThread(ForkJoinPool pool) {
    // Use a placeholder until a useful name can be set in registerWorker
    super("aForkJoinWorkerThread");
    this.pool = pool;
    this.workQueue = pool.registerWorker(this);
}
Copy the code

registerWorker(this)

final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
    UncaughtExceptionHandler handler;
    wt.setDaemon(true);                           // configure thread
    if((handler = ueh) ! =null)
        wt.setUncaughtExceptionHandler(handler);
    WorkQueue w = new WorkQueue(this, wt);
    int i = 0;                                    // assign a pool index
    int mode = config & MODE_MASK;
    int rs = lockRunState();
    try {
        WorkQueue[] ws; int n;                    // skip if no array
        if((ws = workQueues) ! =null && (n = ws.length) > 0) {
            int s = indexSeed += SEED_INCREMENT;  // unlikely to collide
            int m = n - 1;
            i = ((s << 1) | 1) & m;               // odd-numbered indices
            if(ws[i] ! =null) {                  // collision
                int probes = 0;                   // step by approx half n
                int step = (n <= 4)?2 : ((n >>> 1) & EVENMASK) + 2;
                while(ws[i = (i + step) & m] ! =null) {
                    if (++probes >= n) {
                        workQueues = ws = Arrays.copyOf(ws, n <<= 1);
                        m = n - 1;
                        probes = 0;
                    }
                }
            }
            w.hint = s;                           // use as random seed
            w.config = i | mode;
            w.scanState = i;                      // publication fencews[i] = w; }}finally {
        unlockRunState(rs, rs & ~RSLOCK);
    }
    wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
    return w;
}
Copy the code

The above DefaultForkJoinWorkerThreadFactory newThread method returns a new thread

When a new thread is created, its constructor is pool.registerworker (this); The method is passed in to the thread itself

See wt.setdaemon (true) in this method; Set this thread to a daemon thread (wt is the thread itself)

Custom thread pools

Some configuration of the default thread pool can be changed by setting system parameters, but it is not recommended. Because such modification affects the entire program, it is not easy to control.

The CompletableFuture method has an argument that lets us use a custom thread pool.

If you look at it, you will see that xxxAsync() methods occur in pairs and are overloaded, e:

runAsync(Runnable runnable);

runAsync(Runnable runnable, Executor executor);   
Copy the code

The Executor argument can be passed to our custom thread pool

public static void main(String[] args) {
    ExecutorService executorService = Executors.newSingleThreadExecutor();
    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
        System.out.println("do something ......");
    }, executorService);

    future.join();
    executorService.shutdown();
}
Copy the code

When using the thread pool, close it. The default thread created in the thread pool is not a daemon thread. If you do not close the thread pool, your program will not end.

New ThreadPoolExecutor() is recommended for custom thread pools; Instead of directly using the tools provided by the four thread pools, they have their own defects, detailed information can be viewed thread pool related information, according to their own business needs, to create a suitable thread pool.