Prior to Java 7, if we wanted to process a collection in parallel, we needed the following steps
1. Manually divide into several parts
2. Create threads for each part
3. Merge when appropriate
You also need to focus on the modification of shared variables between multiple threads. Java8 gives us parallel streams that can be turned on with one click. Isn’t that cool? Let’s see.
The parallel flow
Recognize and turn on parallel flows
What is a parallel stream: A parallel stream is a stream that divides the contents of a stream into blocks of data and processes each block separately with different threads. For example, there is a requirement:
We have a List, and each apple in the List has only a weight. We also know that the price of an Apple is 5 yuan per kilogram. Now we need to figure out the price of each apple.
List<Apple> appleList = new ArrayList<>(); For (Apple Apple: appleList) {apple.setprice (5.0 * apple.getweight () / 1000); }Copy the code
We calculate the price of each Apple by iterating through the Apple objects in the list. The time complexity of this algorithm is O(list.size()). As the list size increases, the time will increase linearly. The parallel flow
This time can be greatly reduced. The parallel stream processes the collection as follows:
Applelist.parallelstream ().foreach (apple -> apple.setprice (5.0 * apple.getweight () / 1000)); applelist.parallelstream ().foreach (apple.setprice (5.0 * apple.getweight () / 1000));Copy the code
The difference from a normal stream is the parallelStream() method called here. It is also possible to convert a normal stream into a parallel stream with stream.parallel(). Parallel flows can also be converted to sequential flows by sequential() methods.
But note: Parallel and sequential transformations of streams do not make any real changes to the flow itself, just a mark. And with multiple parallel/sequential conversions to streams in a pipeline, only the last method call takes effect
Parallel flow is so convenient, where do its threads come from? How many? How to configure it?
The default ForkJoinPool thread pool is used internally in parallel streams. The default number of threads is the number of cores of the processor.
Java.util.concurrent.ForkJoinPool.com mon. Parallelism can change the thread pool size. However, the value is a global variable. Changing it affects all parallel streams. It is currently not possible to configure a specific number of threads for each stream. Generally speaking, the number of processor cores is a good choice
Test the performance of parallel streams
To make it easier to test performance, we let the thread sleep for 1 second after each apple price calculation, indicating that other IO related operations were performed during this time, and output the program execution time, sequential execution time:
public static void main(String[] args) throws InterruptedException { List<Apple> appleList = initAppleList(); Date begin = new Date(); For (Apple Apple: appleList) {apple.setprice (5.0 * apple.getweight () / 1000); Thread.sleep(1000); } Date end = new Date(); Log.info (" Number of apples: {}, time: {}s", applelist.size (), (end.getTime() -begin.gettime ()) /1000); }Copy the code
The parallel version
List<Apple> appleList = initAppleList(); Date begin = new Date(); Applelist.parallelstream ().foreach (apple -> {apple.setprice (5.0 * apple.getweight () / 1000); try { Thread.sleep(1000); }catch (InterruptedException e) { e.printStackTrace(); }}); Date end = new Date(); Log.info (" Number of apples: {}, time: {}s", applelist.size (), (end.getTime() -begin.gettime ()) /1000);Copy the code
Time consuming situation
As we predicted, my computer was a quad-core I5 processor. After I started parallel, each of the four processors executed one thread, and the last 1s completed the task!
Can parallel streams be used freely?
Separability affects the velocity of the flow
From the above test, one could easily conclude that parallel streams are fast enough that we can completely abandon the foreach/fori/iter external iteration and use the internal iteration provided by Stream.
Is that really the case? Is parallel flow really so perfect? The answer, of course, is no. You can copy the code below and test it on your own computer. After testing, you can see that parallel streaming is not always the fastest way to process.
1. For the first n numbers processed by the iterate method, it is always slower than the loop, whether it is parallel or not. The non-parallel version can be interpreted as the flow operation is slower than the loop. Why is the parallel version slow? There are two things to note here:
-
Iterate generates boxed objects that must be unboxed into numbers to sum
-
It is difficult to break iterate into separate blocks to execute in parallel
This is an interesting question because it is important to realize that some stream operations are easier to parallelize than others. For Iterate, each application of this function depends on the result of the previous application.
So in this case, not only are we not effectively dividing the stream into small chunks. It also adds to the overhead again because of parallelization.
2. For the Longstream.rangeclosed () method, there is no iterate second pain point. It generates values of the basic type without unpacking, and it can directly split the generated number 1-n into 1-n /4, 1n/4-2n/4,… 3n over 4 minus n, that’s four parts. So the parallel state of rangeClosed() is faster than the external iteration of the for loop:
package lambdasinaction.chap7; import java.util.stream.*; public class ParallelStreams { public static long iterativeSum(long n) { long result = 0; for (long i = 0; i <= n; i++) { result += i; } return result; } public static long sequentialSum(long n) { return Stream.iterate(1L, i -> i + 1).limit(n).reduce(Long::sum).get(); } public static long parallelSum(long n) { return Stream.iterate(1L, i -> i + 1).limit(n).parallel().reduce(Long::sum).get(); } public static long rangedSum(long n) { return LongStream.rangeClosed(1, n).reduce(Long::sum).getAsLong(); } public static long parallelRangedSum(long n) { return LongStream.rangeClosed(1, n).parallel().reduce(Long::sum).getAsLong(); } } package lambdasinaction.chap7; import java.util.concurrent.*; import java.util.function.*; public class ParallelStreamsHarness { public static final ForkJoinPool FORK_JOIN_POOL = new ForkJoinPool(); public static void main(String[] args) { System.out.println("Iterative Sum done in: " + measurePerf(ParallelStreams::iterativeSum, 10_000_000L) + " msecs"); System.out.println("Sequential Sum done in: " + measurePerf(ParallelStreams::sequentialSum, 10_000_000L) + " msecs"); System.out.println("Parallel forkJoinSum done in: " + measurePerf(ParallelStreams::parallelSum, 10_000_000L) + " msecs" ); System.out.println("Range forkJoinSum done in: " + measurePerf(ParallelStreams::rangedSum, 10_000_000L) + " msecs"); System.out.println("Parallel range forkJoinSum done in: " + measurePerf(ParallelStreams::parallelRangedSum, 10_000_000L) + " msecs" ); } public static <T, R> long measurePerf(Function<T, R> f, T input) { long fastest = Long.MAX_VALUE; for (int i = 0; i < 10; i++) { long start = System.nanoTime(); R result = f.apply(input); long duration = (System.nanoTime() - start) / 1_000_000; System.out.println("Result: " + result); if (duration < fastest) fastest = duration; } return fastest; }}Copy the code
Shared variable modification
Parallel flow, while easily implementing multiple threads, does not solve the problem of modifying shared variables in multiple threads. The following code contains the shared variable total and calculates the sum of the first N natural numbers using sequential and parallel streams, respectively:
public static long sideEffectSum(long n) { Accumulator accumulator = new Accumulator(); LongStream.rangeClosed(1, n).forEach(accumulator::add); return accumulator.total; } public static long sideEffectParallelSum(long n) { Accumulator accumulator = new Accumulator(); LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add); return accumulator.total; } public static class Accumulator { private long total = 0; public void add(long value) { total += value; }}Copy the code
Sequential execution yields a result of 50000005000000 each time, while parallel execution yields a variety of results.
This is because there is a data race every time you visit Totle. For reasons of the data race, check out the volatile blog. Therefore, it is not recommended to use parallel streams when there are operations in your code that modify shared variables.
Attention to the use of parallel streams
The following points need to be noted when using parallel streams:
Try to use raw data streams such as LongStream/IntStream/DoubleStream instead of Stream to process numbers to avoid the extra overhead caused by frequent unpacking.
To consider the total computational cost of the flow’s operation pipeline, assume that N is the total number of tasks to operate and Q is the time of each operation. N * Q is the total time of the operation, and the higher the Q value, the more likely it is to benefit from using parallel streams.
For example, the front end sends several types of resources that need to be stored in the database. Each resource corresponds to a different table. We can think of the number of types as N, and the network time to store the database + the insertion time is Q.
Generally, the network time is relatively large. Therefore, this operation is more suitable for parallel processing. Of course, when the number of types is greater than the number of cores, the performance benefit of this operation is somewhat discounted. A better optimization method will be provided in a future blog.
-
For small data volumes, parallel streams are not recommended
-
Stream data that is easy to split into blocks, and parallel streams are recommended
-
The following is a table of separable performance for some common collections framework flows
Here are some common collection framework splitables for flows:
Code word is not easy, if you think after reading a harvest, might as well point to see let more people see it ~
Conclusion:
The 2020 interview question is divided into 19 modules, which are: Java Basics, Containers, Multithreading, Reflection, Object copy, Java Web, Exceptions, Networking, Design Patterns, Spring/Spring MVC, Spring Boot/Spring Cloud, Hibernate, MyBatis, RabbitMQ, Kafka, Zookeeper, MySQL, Redis, JVM.
Follow the public number: programmer Bai Nannan, access to the above information.