1. Introduction

Before getting to parallelStream, it’s important to understand Stream and its basic operations

I recommend you to read a wave of previous articles free hands, Stream surprisingly has this wave of god operations

2. What is ParallelStream

ParallelStream ParallelStream ParallelStream ParallelStream ParallelStream ParallelStream ParallelStream ParallelStream ParallelStream ParallelStream ParallelStream ParallelStream

To prevent misdirection, parallelism is not necessary because it is time-consuming. It can be used reasonably according to different business scenarios

ParallelStream parallelStream parallelStream parallelStream parallelStream parallelStream parallelStream parallelStream parallelStream parallelStream

If the hardware is supported, if the CPU is single-core, there will only be concurrent processing, not parallelism

This article focuses on ParallelStream, one of which can be a minefield

Will the parallel flows used by the business in the project really all be processed in parallel?

3. How do I use ParallelStream

ParallelStream is the same as Stream in use, essentially returning a single Stream, but underlying processing is either parallel or serial, depending on the condition


The parallel stream is not executed sequentially, but randomly. Of course, it can be sequentially executed for such forEach output, but that is not the focus of this article

4. ForkJoinPool

If you have actually used parallel streams in your projects, you will know ForkJoinPool

Yes, the underlying parallel flow is the use of ForkJoinPool, a job-stealing algorithm thread pool

ForkJoinPool’s advantage is that it can take advantage of multiple cpus by splitting a task into “small tasks” that can be executed in parallel on multiple processor cores. When multiple “small tasks” are completed, the results are combined

5. Parallel flow pitfalls

5.1 Thread safety

As long as there is parallel processing, there is a thread-safety issue if the operation in the process produces race conditions

Here is an example to illustrate the specific problem

public static void main(String[] args) {
    List<Integer> integerList = Lists.newArrayList();
    List<String> strList = Lists.newArrayList();

    int practicalSize = 1000000;
  for (int i = 0; i < practicalSize; i++) {  strList.add(String.valueOf(i));  }   strList.parallelStream().forEach(each -> {  integerList.add(Integer.parseInt(each));  });   log.info(">>> integerList Expected length :: {}", practicalSize);  log.info(">>> integerList Actual length :: {}", integerList.size()); } / * ** >>> integerList Estimated length :: 1000000* >>> integerList Actual length :: 211195* / Copy the code

The operation process of the above procedure is described as follows:

1. Create two lists of type String and Integer

2. Insert 1,000,000 records into strList

3. Format the strList data into an Integer using a parallel stream and add it to the integerList

4. Output the expected and actual length of integerList

Normally, we expect the integerList to output 1000000

But because parallel stream processing is multithreaded, it makes the ArrayList thread unsafe

In this example, the actual length varies depending on the processing speed of the CPU

The solution

Vector, Colletions, and JUC classes can be used if you really need the above code in your project

Since parallel processing is used, there are still performance requirements, so this part of the container tends to JUC

5.2 Is there parallelism in any case

Take good notes on this question, which is the focus of this article

Let’s start by listing the apis that can call parallel streams

public static void main(String[] args) {
    List<String> stringList = Lists.newArrayList();
    stringList.parallelStream();
    stringList.stream().parallel();
    Stream.of(stringList).parallel();
.} Copy the code

Although the API is called differently, the underlying parallel identifier in AbstractPipeline is set to true

public final S parallel(a) {
   sourceStage.parallel = true;
   return (S) this;
}
Copy the code

This raises the question, is the underlying ForkJoinPool used to call the three different parallel streaming apis?

First of all, how is ForkJoinPool initialized

A static variable property inside ForkJoinPool is used in parallel flows

static final ForkJoinPool common;

public static ForkJoinPool commonPool(a) {
    // assert common ! = null : "static init error";
    return common;
} Copy the code

The ForkJoinPool static block initializes the Common

static {
    // initialize field offsets for CAS etc
    try {
        U = sun.misc.Unsafe.getUnsafe();
Class<? > k = ForkJoinPool.class;
 CTL = U.objectFieldOffset  (k.getDeclaredField("ctl"));  RUNSTATE = U.objectFieldOffset  (k.getDeclaredField("runState"));  STEALCOUNTER = U.objectFieldOffset  (k.getDeclaredField("stealCounter")); Class<? > tk = Thread.class;  PARKBLOCKER = U.objectFieldOffset  (tk.getDeclaredField("parkBlocker")); Class<? > wk = WorkQueue.class;  QTOP = U.objectFieldOffset  (wk.getDeclaredField("top"));  QLOCK = U.objectFieldOffset  (wk.getDeclaredField("qlock"));  QSCANSTATE = U.objectFieldOffset  (wk.getDeclaredField("scanState"));  QPARKER = U.objectFieldOffset  (wk.getDeclaredField("parker"));  QCURRENTSTEAL = U.objectFieldOffset  (wk.getDeclaredField("currentSteal"));  QCURRENTJOIN = U.objectFieldOffset  (wk.getDeclaredField("currentJoin")); Class<? > ak = ForkJoinTask[].class;  ABASE = U.arrayBaseOffset(ak);  int scale = U.arrayIndexScale(ak);  if ((scale & (scale - 1)) != 0)  throw new Error("data type scale not a power of two");  ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);  } catch (Exception e) {  throw new Error(e);  }   commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;  defaultForkJoinWorkerThreadFactory =  new ForkJoinPool.DefaultForkJoinWorkerThreadFactory();  modifyThreadPermission = new RuntimePermission("modifyThread");   / / create ForkJoinPool  common = java.security.AccessController.doPrivileged  (new java.security.PrivilegedAction<ForkJoinPool>() {  public ForkJoinPool run(a) {  return makeCommonPool();  }  });  int par = common.config & SMASK; // report 1 even if threads disabled  commonParallelism = par > 0 ? par : 1; } Copy the code

Parallelism, threadFactory, and exceptionHandler can be used for initial personalization

private static ForkJoinPool makeCommonPool(a) {
    int parallelism = -1;
    ForkJoinPool.ForkJoinWorkerThreadFactory factory = null;
    Thread.UncaughtExceptionHandler handler = null;
    try {  // ignore exceptions in accessing/parsing properties
 String pp = System.getProperty  ("java.util.concurrent.ForkJoinPool.common.parallelism");  String fp = System.getProperty  ("java.util.concurrent.ForkJoinPool.common.threadFactory");  String hp = System.getProperty  ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");  if(pp ! =null)  parallelism = Integer.parseInt(pp);  if(fp ! =null)  factory = ((ForkJoinPool.ForkJoinWorkerThreadFactory) ClassLoader.  getSystemClassLoader().loadClass(fp).newInstance());  if(hp ! =null)  handler = ((Thread.UncaughtExceptionHandler) ClassLoader.  getSystemClassLoader().loadClass(hp).newInstance());  } catch (Exception ignore) {  }  if (factory == null) {  if (System.getSecurityManager() == null)  factory = defaultForkJoinWorkerThreadFactory;  else // use security-managed default  factory = new ForkJoinPool.InnocuousForkJoinWorkerThreadFactory();  }  if (parallelism < 0 && // default 1 less than #cores  (parallelism = Runtime.getRuntime().availableProcessors() - 1) < =0)  parallelism = 1;  if (parallelism > MAX_CAP)  parallelism = MAX_CAP;  return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,  "ForkJoinPool.commonPool-worker-"); } Copy the code

Parallelism Number of internal threads created by ForkJoinPool instances The default value is number of CPU cores in the current running environment -1

This is important, and has a lot to do with how parallel streams work, as discussed below

You’ll understand when you see this

All parallel streams used in this program use the static variable common in ForkJoinPool

Continuing with the question raised in this section, can parallelism really be achieved in code that uses parallel streams in a project?

Here first post the test code, interested partners can also try local

public static void main(String[] args) throws InterruptedException {
    System.out.println(String.format(">>> Number of parallel processing threads :: %s, number of threads in the common thread pool of parallel streams :: %s".            Runtime.getRuntime().availableProcessors(),
            ForkJoinPool.commonPool().getParallelism()));
    List<String> stringList = Lists.newArrayList();
 List<String> stringList2 = Lists.newArrayList();  for (int i = 0; i < 13; i++) stringList.add(String.valueOf(i));  for (int i = 0; i < 3; i++) stringList2.add(String.valueOf(i));   new Thread(() -> {  stringList.parallelStream().forEach(each -> {  System.out.println(Thread.currentThread().getName() + "Start execution..." + each);  try {  Thread.sleep(6000);  } catch (InterruptedException e) {  e.printStackTrace();  }  });  }, "Child thread-1").start();   Thread.sleep(1500);   new Thread(() -> {  stringList2.parallelStream().forEach(each -> {  System.out.println(Thread.currentThread().getName() + "... "" + each);  try {  Thread.sleep(50);  } catch (InterruptedException e) {  e.printStackTrace();  }  });   }, "Child thread-2").start(); } Copy the code

In order to simulate the formal use scenario in the project, the test code is described as follows:

1, “subthread-1” and “subthread-2” represent two different businesses in the project using parallel flows, respectively

2. The server can only support 12 concurrent threads at a time, and the public ForkJoinPool concurrency is 11 at initialization

3. “subthread-1” service is time-consuming. Counting the executing threads and threads in the thread pool, it can run 12 tasks concurrently

4. If subthread-1 runs all parallel threads in the pool, what happens if subthread-2 runs parallel streams again?

Run the test program and see what happens


Here is the process description of the running chart:

1. You can see that the thread submitting the task also participates in the task execution

2. Because our public ForkJoinPool parallelism is 11 and the number of threads submitting tasks is 12, our child thread 1 performs 13 tasks

3. The task in “child thread-1” will sleep the thread and simulate the task time, so “child thread-1” will fill the common thread pool and leave a task behind

4, Because “subthread-1” runs the task full, so “subthread-2” cannot be executed in parallel, and can only rely on the submission task thread to execute

5. After the 12 tasks of “child thread -1” have finished running, the remaining one task will continue to execute

The problem summary

From the above test program, we know that the parallel flow used in the project is not necessarily parallel when it is actually executed

If the task execution of other parallel streams in the project takes time, the corresponding resources will be occupied, resulting in the final execution of the task through the main thread

Therefore, we must consider the following questions before using parallel streams:

1. Do business scenarios really need parallel processing?

2. Are parallel processing tasks relatively independent? Does it cause race conditions between parallelism?

3. Does parallel processing depend on the order in which tasks are executed?

For these three problems, if the business can meet the usage scenario, and there are corresponding solutions, parallelism can indeed improve a considerable part of performance

6. ParallelStream summary

What is ParallerStream? What is ParallerStream

A streaming process that provides parallel computing

What is the underlying ParallerStream technique for parallel computing

ForkJoinPool. The default parallelism is Runtime.getruntime ().availableprocessors () -1

Some problems existing in parallel flow can also be said to be problems existing in concurrent programming

Thread safety issues and scenarios are suitable for parallel processing

All in all, parallel processing can be used in the right scenarios

Afterword.

Due to the author’s limited level, I hope you can give feedback to correct the article’s mistakes and incorrect places, thanks to 🙏

Small partners like is my biggest support 😆, I hope you can like, comment, see three even!

This article is formatted using MDNICE