Background of blood lesson: use thread pool to migrate stock data, but there is always a batch of data migration failure, no abnormal log print

Murder is caused

Heard parallelStream parallel flow is a good thing, because the daily development stream serial stream of scenario is more, just need to write the migration program can use this time, it also not hurriedly to pack *, while the going is good. I’m also smart enough to know that in the background of the JVM, this is done using a common fork/ Join pool that is shared by all parallel streams. By default, fork/ Join pools allocate one thread per processor. The workaround is to create your own thread pool such as

ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
pool.submit(() -> {
            list.parallelStream().collect(Collectors.toList());
        });
Copy the code

So this is where the mines were laid.

Submit or execute

  public static void main(String[] args) throws InterruptedException, ExecutionException {
        final ExecutorService pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
        List<Integer> list = Lists.newArrayList(1.2.3.null);
        / / 1. Using the submit
        pool.submit(() -> {
            list.parallelStream().map(a -> a.toString()).collect(Collectors.toList());
        });
        TimeUnit.SECONDS.sleep(3);
        / / 2. Use the execute
        pool.execute(() -> {
            list.parallelStream().map(a -> a.toString()).collect(Collectors.toList());
        });
        //3. Call get() with submit
        pool.submit(() -> {
            list.parallelStream().map(a -> a.toString()).collect(Collectors.toList());
        }).get();
        TimeUnit.SECONDS.sleep(3);
    }
Copy the code

If you run the above use case for yourself, you will find that the submit method does not print an error log, but the execute method prints an error log. However, calling the Get () method on the FutureJoinTask returned by Submit raises an exception. When the null value was passed, an exception occurred. However, the exception log was caught in the Submit method and was not printed. The caught exception was wrapped in the returned result class FutureJoinTask. It didn’t sell again.

If you do not need to return results asynchronously, do not use itsubmitmethods

Conclusion first, I made the mistake of assuming that the difference between submit and execute is that one returns asynchronous results and the other does not return a single result, but the truth is cruel. The logic in Submit () must have included catching an exception thrown by an asynchronous task that was not thrown again because of improper use.

A ForkJoinPool#submit() returns a ForkJoinTask that can obtain the result of an asynchronous task. Now that the asynchronous task has thrown an exception, what will happen if we try to obtain the result? ForkJoinTask#get()

public final V get(a) throws InterruptedException, ExecutionException {
    int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
        doJoin() : externalInterruptibleAwaitDone();
    Throwable ex;
    if ((s &= DONE_MASK) == CANCELLED)
        throw new CancellationException();
    // Async task exceptions are wrapped as ExecutionException and thrown again when get() is called
    if(s == EXCEPTIONAL && (ex = getThrowableException()) ! =null)
        throw new ExecutionException(ex);
    return getRawResult();
}
Copy the code

An exception from an asynchronous task is wrapped as An ExecutionException and thrown again when get() is called, but where is the exception caught? All threads need to override the Thread#run() method. Forkjoinworkerthreads are wrapped as forkjoinworkerthreads, so let’s look at the implementation of ForkJoinWorkerThread#run().

public void run(a) {
    if (workQueue.array == null) { // only run once
        Throwable exception = null;
        try {
            onStart();
            pool.runWorker(workQueue);
        } catch (Throwable ex) {
            // When an exception occurs, ForkJoinTask#get() is thrown again
            exception = ex;
        } finally {
            try {
                onTermination(exception);
            } catch (Throwable ex) {
                if (exception == null)
                    exception = ex;
            } finally {
                pool.deregisterWorker(this, exception); }}}}Copy the code

The above analysis is based on ForkJoinPool. If all thread pool submit and execute methods are implemented similarly, how about ThreadPoolThread? We need to find to ThreadPoolThread asynchronous task was eventually packaging for any Thread subclass or Java. Lang. Runnable# run, the answer is Java. Util. Concurrent. FutureTask

 public void run(a) {...try {
            Callable<V> c = callable;
            if(c ! =null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    // Catch an exception
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if(ran) set(result); }}... }Copy the code

conclusion

Java. Util. Concurrent. ExecutorService# submit (Java. Lang. Runnable) why the thread pool will have this kind of set, we actually thinking should not be limited to the thread pool, but on getting asynchronous tasks as a result, exception is also belong to the asynchronous results, The implementation of FutureTask as a concurrent utility class provided by the JDK already provides a good answer, which is to get the result of an asynchronous task. Exceptions are also asynchronous results. If an asynchronous task has a runtime exception, the exception will be rewrapped and thrown when the result of the task is fetched

Akik PLZ called me a red scarf

Reference: juejin. Cn/post / 684490…

This blog welcome to reprint, but without the consent of the author must retain this paragraph of statement, and in the article page obvious location to give the original link, otherwise reserve the right to pursue legal responsibility. Code word is not easy, your praise is the biggest power of my writing