“This is the second day of my participation in the November Gwen Challenge. See details of the event: The last Gwen Challenge 2021”.
In the previous section, we briefly mentioned that the Stream interface allows you to work with its elements very easily: collections can be converted to parallel streams by calling the parallelStream method on collection sources. A parallel flow is a flow that splits content into chunks of data and uses different threads to process each chunk separately. This way, you can automatically distribute the workload for a given operation to all the cores of a multi-core processor.
In reality, calling the Parallel method on a sequential stream does not mean any real change to the stream itself. Internally, it essentially sets a Boolean flag indicating that you want all operations performed after a call to parallel to be executed in parallel. Similarly, you can turn a parallel stream into a sequential stream simply by calling the sequential method on it.
First, performance test
Before we get into the details, let’s take a quiz. We write a summation operation on a stream of natural numbers with a specified value. For a summation operation, perform 10 times and extract the operation with the shortest use time. Do a traditional for loop, a Stream loop, and a parallel Stream loop as follows:
import java.util.function.Function;
import java.util.stream.Stream;
/ * * *@description: Tests the performance of parallelism@author: weirx *@date: 2021/10/22 15:20 *@version: 3.0 * /
public class TestStreamParallel {
/** * description: Passes in a function and a number. This method executes the passed method 10 times, fetching the shortest execution time **@param adder
* @param n
* @return: long
* @author: weirx
* @time: 2021/10/22 15:29
*/
public static long measureSumPerf(Function<Long, Long> adder, long n) {
long fastest = Long.MAX_VALUE;
for (int i = 0; i < 10; i++) {
long start = System.nanoTime();
adder.apply(n);
long duration = (System.nanoTime() - start) / 1 _000_000;
if(duration < fastest) { fastest = duration; }}return fastest;
}
/** * description: sums the input values **@param aLong
* @return: java.lang.Long
* @author: weirx
* @time: 2021/10/25 10:02 * /
private static Long testFor(Long aLong) {
/ / jdk1.7 summation
long result = 0;
for (long i = 1L; i <= aLong; i++) {
result += i;
}
return result;
}
/** * description: sums the input values **@param aLong
* @return: java.lang.Long
* @author: weirx
* @time: 2021/10/25 10:02 * /
private static Long testStreamParallel(Long aLong) {
// jdk1.8 sum - parallel
return Stream.iterate(0L, i -> i + 1).limit(aLong).parallel().reduce(0L, Long::sum);
}
/** * description: sums the input values **@param aLong
* @return: java.lang.Long
* @author: weirx
* @time: 2021/10/25 10:02 * /
private static Long testStream(Long aLong) {
// jdk1.8 sum - non-parallel
return Stream.iterate(0L, i -> i + 1).limit(aLong).reduce(0L, Long::sum); }}Copy the code
The traditional for loop results in the following:
public static void main(String[] args) {
System.out.println("Shortest time:" + measureSumPerf(TestStreamParallel::testFor, 10000000) + "ms"); } -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- the shortest time: 3 msCopy the code
The Stream result is as follows:
public static void main(String[] args) {
System.out.println("Shortest time:" + measureSumPerf(TestStreamParallel::testStream, 10000000) + "ms"); } -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- the shortest time: 106 msCopy the code
StreamParallel results as follows:
public static void main(String[] args) {
System.out.println("Shortest time:" + measureSumPerf(TestStreamParallel::testStreamParallel, 10000000) + "ms"); } -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- the shortest time: 131 msCopy the code
Results: 1) The iterated version of the traditional for loop should be much faster because it is more low-level and, more importantly, does not require any boxing or unboxing of the primitive type. 2) Using Stream is much slower than traditional for. 3) Using parallel Stream is the least efficient.
So what are the causes of these problems? 1) Iterate is the boxed object, which must be unboxed into numbers before summation; 2) It is difficult to break up the iterate into separate blocks for parallel execution because each application of this function depends on the results of the previous application, as shown below:
The entire list of numbers is not ready for induction and therefore cannot be effectively divided into small chunks for parallel processing. When the stream marks bits in parallel, it actually adds overhead to allocate each summation operation to a different thread.
To sum up: Iterate is an operation that is not easily parallelized. It can even make the overall flow operation less efficient.
How to solve the above problems reasonably and efficiently? We can use longstream. rangeClosed, which has two advantages over iterate: 1) Produce the raw data type directly without the overhead of unboxing and packing. 2) Generate data range, easy to split into small pieces, facilitate parallel.
Let’s get straight to the results:
/** * description: use longstream. rangeClosed to sum *@param aLong
* @return: java.lang.Long
* @author: weirx
* @time: 2021/10/25 10:53 * /
private static Long testRangeClosed(Long aLong) {
return LongStream.rangeClosed(0, aLong).reduce(0L, Long::sum);
}
public static void main(String[] args) {
System.out.println("Shortest time:" + measureSumPerf(TestStreamParallel::testRangeClosed, 10000000) + "ms"); } --------------------------- minimum time: 4msCopy the code
What if you run it in parallel? The results are as follows:
/** * description: use longstream. rangeClosed to sum *@param aLong
* @return: java.lang.Long
* @author: weirx
* @time: 2021/10/25 10:53 * /
private static Long testRangeClosedParallel(Long aLong) {
return LongStream.rangeClosed(0, aLong).parallel().reduce(0L, Long::sum);
}
public static void main(String[] args) {
System.out.println("Shortest time:" + measureSumPerf(TestStreamParallel::testRangeClosedParallel, 10000000) + "ms"); } -------------------------------------- Minimum time: 1msCopy the code
Use parallel streams correctly and efficiently
First look at the following error, which has the following summation code:
import java.util.stream.LongStream; /** * @description: concurrent stream * @author: weirx * @date: 2021/10/25 11:02 * @version: 3.0 * / public class ConcurrentForStreamParallel {/ * * * description: calls the add method of the Accumulator are summed up * @ param n * @ return: long * @author: weirx * @time: 2021/10/25 11:03 */ public static long sideEffectSum(long n) { Accumulator accumulator = new Accumulator(); LongStream.rangeClosed(1, n).forEach(accumulator::add); return accumulator.total; } public static class Accumulator { public long total = 0; public void add(long value) { total += value; } } public static void main(String[] args) { for (int i = 0; i< 10 ; i++){ System.out.println(sideEffectSum(10000000)); }} -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- 50000005000000, 50000005000000, 50000005000000, 50000005000000, 50000005000000 50000005000000 50000005000000 50000005000000 50000005000000 50000005000000Copy the code
If we use parallel operations on this code, we get the following result:
public static long sideEffectSum(long n) {
Accumulator accumulator = new Accumulator();
LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add);
returnaccumulator.total; } -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -11448626323551
10712400489958
4825469864081
5760309604570
8135917300720
13477296726050
10068084182814
7911075623394
11626955826086
8579850729746
Copy the code
As shown above, there is a direct problem of resource competition between threads, not to mention efficiency issues, even data correctness can not be guaranteed.
Here are some suggestions for using parallel flows:
1) Test, the most intuitive way. If it is not clear that the use of parallel flow can bring performance improvement, then just as in this article, find the most appropriate way.
2) Pay attention to packing. Automatic packing and unpacking can greatly reduce performance. Primitive streams such as IntStream, LongStream, DoubleStream, etc. should be used to avoid these operations.
3) Some operations themselves perform worse on parallel streams than sequential streams. In particular, operations such as Limit and findFirst, which depend on the order of elements, are expensive to perform on parallel streams. For example, findAny will perform better than findFirst because it doesn’t have to be executed sequentially. If you need n elements in a stream rather than specifically asking for the first n, calling limit on an unordered parallel stream may be more efficient than calling a single ordered stream.
4) Consider the calculation cost of assembly line. Assuming that the number of processing elements N, Q is the cost of an element passing through the pipeline, then N*Q is the calculation cost of the entire pipeline. The higher the value of Q, the more efficient it is likely to be to use parallel streams.
5) Using parallel streams for small amounts of data is not a good choice. Parallelization has its own overhead.
6) Consider whether the data structure behind the flow is easy to split. ArrayList is much more efficient than LinkedList. The former doesn’t need to be traversed, the latter does.
7) The operation of local flow leads to the uncertainty of the whole assembly line. For example, the filter operation filters a large number of elements, resulting in an unknown size of the stream.
8) Also consider whether the cost of merging steps in terminal operations is large or small. (for example, the combiner method in Collector). If this step is costly, the cost of combining the partial results of each subflow may outweigh the performance gains from parallel flows.
The parallel efficiency of several data sources is given below:
The source | decomposability |
---|---|
ArrayList | excellent |
LinkedList | poor |
IntStream.range | excellent |
Stream.iterate | poor |
HashSet | good |
TreeSet | good |
XDM, see this, give it a thumbs up!!