Java8’s parallel streams are very developer-friendly and very easy to implement parallel computing. ForkJoinTask is used to implement parallel processing of streams. It takes full advantage of the CPU’s multi-core capabilities. The Stream API completely hides the complex implementation of the Stream.

Start parallel Stream

To start a parallel Stream, simply call the parallel() method before calling the termination operator to start parallel computation.

    @Test
    public void parallelStream(a){
        IntStream.range(0.100)
                .parallel()
                .forEach(e-> System.out.println(Thread.currentThread()+""+e));
    }
Copy the code

By executing the above code, you can see that the code will be executed by a ForkJoinPool thread.

A profound

Here the sum between 0 and 100000000000 is calculated using parallel and traversal streams respectively

    @Test
    public void testSum(a){
        // Parallel computation
        long time= System.currentTimeMillis();
        long sum1 = LongStream.rangeClosed(1.100000000000l).parallel().sum();
        System.out.println(System.currentTimeMillis()-time);
        // serial computation
        time = System.currentTimeMillis();
        long sum2 = LongStream.rangeClosed(1.100000000000l).sum();
        System.out.println(System.currentTimeMillis()-time);

        System.out.println("sum1 = "+sum1+" sum2 = "+sum2);
        Assert.assertTrue(sum1==sum2);
    }
Copy the code

The execution result is shown in the figure. My CPU is 6 core, and the time of serial computation is almost 5 times that of parallel computation. In parallel computing, the multi-core performance of CPU is fully utilized. If the amount of data is larger, the gap between the two is larger, and the advantages of parallel Stream are more obvious.

sequential()andparallel()

Parallel creates a sequential stream into a parallel stream, which is mutually exclusive and conflicting. Sequential creates a parallel stream into a serial stream, but that doesn’t mean they can’t happen at the same time.

IsParallel () can check if a stream isParallel.

    @Test
    public void isParallel(a){
        IntStream stream = IntStream.range(0.100);
        stream.parallel();
        Assert.assertTrue(stream.isParallel());
    }
Copy the code

Performing a ‘serial -> parallel -> serial -> parallel’ transformation, as shown in the code, still ends up as a parallel flow.

    @Test
    public void spspIsP(a){
        IntStream stream = IntStream.range(0.100);
        stream.parallel().map(e->e<<1).sequential().parallel();
        Assert.assertTrue(stream.isParallel());
    }
Copy the code

Perform the serial -> parallel -> serial conversion as shown in the code, and ultimately the serial stream.

    @Test
    public void spsIsS(a){
        IntStream stream = IntStream.range(0.100);
        stream.parallel().map(e->e<<1).sequential();
        Assert.assertFalse(stream.isParallel());
    }
Copy the code

The above example shows that the serialization and parallelism of a stream depends on which method is invoked last (sequential() or parallel()).

ForkJoinPool

The parallel stream layer relies on the ForkJoinPool.commonPool thread pool, a thread shared globally by JVM processes. When time-consuming operations are performed in this thread pool, subsequent tasks pile up, causing performance problems. By default, the size of the thread pool for logic auditing – 1, of course you can also through the JVM parameters’ java.util.concurrent.ForkJoinPool.com mon. Parallelism ‘to modify the thread pool size, here does not recommend using this way, The best way to do this is to use custom thread pools.

As shown in the code, a four-thread ForkJoinPool is created.

    @Test
    public void CustomPool(a){
        ForkJoinPool forkJoinPool = new ForkJoinPool(4);

        forkJoinPool.submit(()->{
            IntStream.range(0.100).parallel().forEach(e-> System.out.println(Thread.currentThread()+""+e));
        }).join();
    }
Copy the code

The code execution result is shown below, with parallel streams executing in a custom thread pool.

Why should forkJoinPool.submit() implement a custom thread pool? A ForkJoinWorkerThread holds a reference to a ForkJoinPool inside the ForkJoinWorkerThread. When the code is executed, the doInvoke method is called. CurrentThread indicates that a ForkJoinWorkerThread is a Thread of type ForkJoinWorkerThread, and that a ForkJoinPool can be used to perform calculations.

    /**
     * Implementation for invoke, quietlyInvoke.
     *
     * @return status upon completion
     */
    private int doInvoke(a) {
        int s; Thread t; ForkJoinWorkerThread wt;
        return (s = doExec()) < 0 ? s :
            ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
            (wt = (ForkJoinWorkerThread)t).pool.
            awaitJoin(wt.workQueue, this.0L) :
            externalAwaitDone();
    }
Copy the code

conclusion

Parallel Stream brings great convenience to developers. A single line of code can enable multi-threaded parallel computing. Excellent! However, parallel computing can only be used under certain conditions, such as no dependencies or race conditions between tasks.