Historical articles recommended:

InnoDB increment principle is not clear, how to CRUD?

Lombok’s Builder annotations don’t work. Try SuperBuilder

Arthas Principles series 5: Implementation principles of the Watch command

Arthas Principles series 4: Bytecode instrumentation makes everything possible

Arthas Principles Series 3: Server startup process

Arthas Principles Series 2: Overall architecture and project entry

Arthas Principles Series 1: Implement a minimalist Arthas Watch command

preface

Java 8 brings you a very convenient multithreading tool: Parallel flow, should allow Java multi-thread tedious programming specification, need only one line of code, you can make a multi-threaded running, too, seems to make a lot of people forget the governed by multiple threads of fear, this article is to share a real production failure, because in the consumer message processor used in the Java 8 parallel flows, As a result, the ability of the cluster to consume messages rapidly decreases, causing online messages to accumulate and causing faults. Some of you may wonder, what is it about parallel flow that makes it work against you?

Are parallel streams necessarily faster than serial?

The answer is: not really, depending on the situation

I’ve made free of the code that causes the online problem into the following example code:

void testParallelStream(a) throws InterruptedException {
		ExecutorService threadPool = new ThreadPoolExecutor(50.200.20, TimeUnit.MINUTES, new ArrayBlockingQueue<>(1000),
				new ThreadFactoryBuilder().setNameFormat("test-parallel-thread").build(), new ThreadPoolExecutor.CallerRunsPolicy());

    	Long time1 = System.currentTimeMillis();
		// 1. Multithreading +foreach execution time
		for (int i = 0; i < ARRAY_LENGTH; i++) {
			CommonExecutor commonExecutor = new CommonExecutor();
			commonExecutor.array = arrays[i];
			threadPool.submit(commonExecutor);
		}
		commonCountDownLatch.await();
		Long time2 = System.currentTimeMillis();
		System.out.println("For loop time:" + (time2 - time1));

		threadPool = new ThreadPoolExecutor(50.200.20, TimeUnit.MINUTES, new ArrayBlockingQueue<>(1000),
				new ThreadFactoryBuilder().setNameFormat("test-parallel-thread").build(), new ThreadPoolExecutor.CallerRunsPolicy());

		// 2. Multithreading + parallel stream execution time
		for (int i = 0; i < ARRAY_LENGTH; i++) {
			ParallelStreamExecutor parallelStreamExecutor = new ParallelStreamExecutor();
			parallelStreamExecutor.array = arrays[i];
			threadPool.submit(parallelStreamExecutor);
		}
		parallelCountDownLatch.await();
		Long time3 = System.currentTimeMillis();
		System.out.println("Parallel flow time:" + (time3 - time2));
}
Copy the code

Two of the actuators submitted to the thread pool look like this:

@Data
    private static class CommonExecutor implements Runnable {

    	private long[] array;

		@Override
		public void run(a) {
			// Select sort method to sort
			for (int i = 0; i < array.length; i++) { array[i] = i * i; } commonCountDownLatch.countDown(); }}Copy the code
@Data
	private static class ParallelStreamExecutor implements Runnable {

		private long[] array;

		@Override
		public void run(a) {
			// Select sort method to sort
			IntStream.range(0, array.length).parallel().forEach(i -> array[i] = i * i); parallelCountDownLatch.countDown(); }}Copy the code

The thinking of this code is very simple, it is for each line a two-dimensional array of arrays, calculation of its square column subscript, and backfill to array, but the process is done through the thread pool, submitted to the thread pool actuator has two types, one is common for loop, through the cursor traverses each element subscript, and calculate the square number. The other uses parallel streams to accomplish the same thing. For simplicity, let’s loop through this code 10 times and count the elapsed time (in milliseconds) between the two implementations so you can guess which one is faster.

Here’s the actual time:

Number of executions 1 2 3 4 5 6 7 8 9 10 The average time
For loop time 18 17 13 18 17 13 13 16 20 16 16.1
Parallel flow time 32 41 38 59 51 34 53 57 49 47 46.1

The result is that the parallel stream executes significantly slower than the for loop. What’s wrong with it?

Parallel flow implementation principle

The problem with parallel streams is that all actions submitted to parallel streams from the same process are handled by the same common thread pool. The number of threads that will actually process an Action is determined by the size of the common threadPool in the parallel stream.

@Override
@SuppressWarnings("unchecked")
public final S parallel(a) {
  sourceStage.parallel = true;
  return (S) this;
}
Copy the code

Call the parallel only Java. Util. Stream. The sourceStage AbstractPipeline. The parallel set to true, to call foreach, will call to this method

final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
  assert getOutputShape(a) == terminalOp.inputShape();
  if (linkedOrConsumed)
    throw new IllegalStateException(MSG_STREAM_LINKED);
  linkedOrConsumed = true;

  return isParallel()
    ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
    : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
Copy the code

Here isParallel () will determine the sourceStage set above. The parallel fields, so as to make the program execution flow go to terminalOp. EvaluateParallel this branch, with back again will find will eventually submitted to ForEachTask task

@Override
public <S> Void evaluateParallel(PipelineHelper
       
         helper, Spliterator
         spliterator)
        {
  if (ordered)
    new ForEachOrderedTask<>(helper, spliterator, this).invoke();
  else
    new ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke();
  return null;
}
Copy the code

ForEachTask () {compute ();} ForEachTask () {compute ();} ForEachTask () {compute ()

public void compute(a) {
  The spliterator type is a RangeIntSpliterator, where from = 0, upTo = 10000, and last = 0
  Spliterator<S> rightSplit = spliterator, leftSplit;
  // estimateSize = upTo - from + last
  long sizeEstimate = rightSplit.estimateSize(), sizeThreshold;
  if ((sizeThreshold = targetSize) == 0L)
    // The target size is calculated based on the common thread pool mentioned above and is equal to sizeEstimate/ thread pool size * 4
    targetSize = sizeThreshold = AbstractTask.suggestTargetSize(sizeEstimate);
  boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags());
  boolean forkRight = false;
  Sink<S> taskSink = sink;
  ForEachTask<S, T> task = this;
  // Enter the task segmentation logic,
  while(! isShortCircuit || ! taskSink.cancellationRequested()) {// Shards until the size of the subtask is less than the threshold
    if (sizeEstimate <= sizeThreshold ||
        // trySplit() splits rightSplit equally, and returns the first subtask of the split. The split ratio is related to the total number of tasks to be split
        // If the size of the subtask to be sharded is less than or equal to 1, null is returned to stop sharding
        (leftSplit = rightSplit.trySplit()) == null) {
      task.helper.copyInto(taskSink, rightSplit);
      break;
    }
    ForEachTask<S, T> leftTask = new ForEachTask<>(task, leftSplit);
    task.addToPendingCount(1);
    ForEachTask<S, T> taskToFork;
    // The order of the cutting tasks by forkRight is
    // Left -> right -> left -> right -> right -> right until the subtask size meets the threshold, which can make the entire task execution more discrete
    // You are welcome to discuss the benefits of this in the comments section
    if (forkRight) {
      forkRight = false;
      rightSplit = leftSplit;
      taskToFork = task;
      task = leftTask;
    }
    else {
      forkRight = true;
      taskToFork = leftTask;
    }
    // Fork submits the shard subtask to the thread pool
    taskToFork.fork();
    sizeEstimate = rightSplit.estimateSize();
  }
  task.spliterator = null;
  task.propagateCompletion();
}
Copy the code

The secret of parallel flow lies in this code, which is the core logic of the whole parallel flow implementation. The essence of this code is to split the serial large tasks submitted at the beginning into smaller tasks submitted to the thread pool.

public final ForkJoinTask<V> fork(a) {
  Thread t;
  if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
    ((ForkJoinWorkerThread)t).workQueue.push(this);
  else
    ForkJoinPool.common.externalPush(this);
  return this;
}
Copy the code

This involves a ForkJoinPool design. In order to avoid competing threads in the regular thread pool for access to a task queue, ForkJoinPool has a common queue for each thread, and external threads must submit tasks to the common queue. Smaller tasks that thread pool threads shard are submitted directly to their own work queue, hence the logic seen above. The entire ForkJoinPool logic is shown below:

  • Threads submitted to the shared queue are stolen by internal worker threads
  • Tasks in the private work queue are forked into smaller tasks and pushed back to the private queue
  • If the worker thread is free, it can also steal tasks from other work queues

ForkJoinPool.common, the common thread pool mentioned above, is initialized in a ForkJoinPool static call block

private static ForkJoinPool makeCommonPool(a) {

        final ForkJoinWorkerThreadFactory commonPoolForkJoinWorkerThreadFactory =
                new CommonPoolForkJoinWorkerThreadFactory();
        int parallelism = -1;
        ForkJoinWorkerThreadFactory factory = null;
        UncaughtExceptionHandler handler = null;
        try {  // ignore exceptions in accessing/parsing properties
            String pp = System.getProperty
                // You can change the size of the common thread pool by setting this value
                ("java.util.concurrent.ForkJoinPool.common.parallelism");
            String fp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.threadFactory");
            String hp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
            if(pp ! =null)
                parallelism = Integer.parseInt(pp);
            if(fp ! =null)
                factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
                           getSystemClassLoader().loadClass(fp).newInstance());
            if(hp ! =null)
                handler = ((UncaughtExceptionHandler)ClassLoader.
                           getSystemClassLoader().loadClass(hp).newInstance());
        } catch (Exception ignore) {
        }
        if (factory == null) {
            if (System.getSecurityManager() == null)
                factory = commonPoolForkJoinWorkerThreadFactory;
            else // use security-managed default
                factory = new InnocuousForkJoinWorkerThreadFactory();
        }
        if (parallelism < 0 && // default 1 less than #cores
            // Gets the number of thread pool threads equal to the currently available processor minus one
            (parallelism = Runtime.getRuntime().availableProcessors() - 1) < =0)
            parallelism = 1;
        if (parallelism > MAX_CAP)
            parallelism = MAX_CAP;
        return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
                                "ForkJoinPool.commonPool-worker-");
    }
Copy the code

This code is simple, it is returned to the thread pool, a fixed default thread pool size is equal to the available processor minus one, this is because in the design of ForkJoinPool external thread can also be involved in executing subtasks, this seemingly clever design is very easy to misuse, especially in the global variables associated with the thread state.

Reasons why parallel streams are slower than serial

After understanding the implementation principle of parallel flow, we can understand why, in the beginning of this article, for the same logic, parallel flow execution is slower than serial.

When using parallel streams in multi-threaded scenarios, since parallel streams use a common thread pool, no matter how many threads are outside, they will all submit tasks to the same thread pool, so you will find that no matter how much you adjust the size of the outside thread pool, it will not speed up the task. Back to the example at the beginning of the article, the real number of threads in the implementation of parallel flow is 7, while the real number of threads in the implementation of serial is 100. Due to the huge difference in the number of threads, the final time is also significantly different.

conclusion

The parallel flow in the design is handy, where there are three easy pits

  • Tasks submitted to parallel streams by the same process are handled by the same common thread pool, so using parallel streams in a multi-threaded environment can reduce concurrency and slow down processing
  • The common thread pool size of the parallel stream is reduced by one for the available processor, and the parallel stream uses external threads to handle internal subtasks, matchingThreadLocalBe careful when using, in some cases withThreadLocalStrongly coupled scenarios can lead toThreadLocalThe same goes for other thread-related global variables
  • Parallel flow is designed to deal with compute-intensive scenarios. If there are many IO scenarios, such as common RPC calls, external threads will be blocked and the number of external threads will increase in a high concurrency scenario. Moreover, such problems are not easy to be found in the test and easily lead to production failures.

I’m giving benefits to my friends

Rocket MQ, the open-source community’s popular messaging middleware that supports Alibaba’s vast business and complex scenarios, has a lot of clever design that many interviewers are keen to explore. Follow the public account bitter Code to pick up the ebook

Welcome to follow the author’s official account: