“This is the third day of my participation in the November Gwen Challenge. See details of the event: The last Gwen Challenge 2021”.

The infrastructure used behind parallel flows is the branch/merge framework introduced in Java 7. We’ll take a closer look at the branch/merge framework in this article.

The purpose of the branch/merge framework is to recursively break tasks that can be parallel into smaller tasks, and then combine the results of each subtask to produce an overall result. It is an implementation of the ExecutorService interface that assigns subtasks to worker threads in a thread pool called ForkJoinPool.

First, the RecursiveTask

To submit a task to this pool, you must create a subclass of RecursiveTask, where R is the type of result produced by the parallelized task (and all subtasks), or a RecursiveAction type if the task does not return a result.

To define a RecursiveTask, just implement its only abstract method compute:

protected abstract R compute(a);
Copy the code

When we implement this approach, we need to define both the logic to split the task into subtasks and the logic to generate the results of a single subtask if it is impossible or inconvenient to split again.

The implementation of this method looks like the following pseudocode:

if(Tasks are small enough or indivisible) {Compute the tasks sequentially}elseCall this method recursively by dividing the task into two subtasks, split each subtask, and wait for all subtasks to complete the result of merging each subtask}Copy the code

The recursive task splitting process is as follows:

Branch/merge framework example: Sum for a numeric range Long[]

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.stream.LongStream;

import static com.cloud.bssp.java8.stream.TestStreamParallel.measureSumPerf;

/ * * *@description: Use ForkJoinPool@author: weirx *@date: 2021/10/25 14:10 *@version: 3.0 * /
public class TestRecursiveTask extends RecursiveTask<Long> {

    /** * requires an array of and */
    private final long[] numbers;

    /** * the starting position of the array summation of subtasks */
    private int start;

    /** * the end position of the array summation of subtasks */
    private int end;

    /** * private construct used to recursively create subtasks ** for the main task@param numbers
     * @param start
     * @param end
     */
    private TestRecursiveTask(long[] numbers, int start, int end) {
        this.numbers = numbers;
        this.start = start;
        this.end = end;
    }

    /** * Public functions are used to build the main task **@param numbers
     */
    public TestRecursiveTask(long[] numbers) {
        this.numbers = numbers;
    }

    /** * The maximum number of tasks to split */
    public static final long THRESHOLD = 10000L;

    @Override
    protected Long compute(a) {
        int length = end - start;
        if (length <= THRESHOLD) {
            // If the size is less than or equal to the threshold, the order is calculated
            return computeSequentially();
        } else {
            // Create a subtask to sum the first half of the array
            TestRecursiveTask left = new TestRecursiveTask(numbers, start, start + length / 2);
            // Another ForkJoinPool thread is used to execute the newly created subtask asynchronously
            left.fork();
            // Create a subtask that sums the last half of the array
            TestRecursiveTask right = new TestRecursiveTask(numbers, start + length / 2, end);
            // Execute the second subtask synchronously
            Long compute = right.compute();
            // Read the result of the first subtask, wait if it is not completed
            Long join = left.join();
            // Result merge
            returncompute + join; }}/** * A simple algorithm for calculating results when subtasks are not separable **@return* /
    private Long computeSequentially(a) {
        long sum = 0;
        for (int i = start; i < end; i++) {
            sum += numbers[i];
        }
        return sum;
    }

    /** * sum the first n natural numbers in parallel **@param n
     * @return* /
    public static long forkJoinSum(long n) {
        long[] numbers = LongStream.rangeClosed(1, n).toArray();
        ForkJoinTask<Long> task = new TestRecursiveTask(numbers);
        return new ForkJoinPool().invoke(task);
    }

    public static void main(String[] args) {
        System.out.println("ForkJoin sum done in: " + measureSumPerf(
                TestRecursiveTask::forkJoinSum, 10000000) + " msecs"); }}Copy the code

Output result:

ForkJoin sum done in: 64 msecs
Copy the code

This performance looks worse than the parallel stream version, but only because the entire digital stream must be put into a long[] before it can be used in a task.

Best use of Fork/join

While the branch/merge framework is fairly easy to use, unfortunately it can also be easily misused. Here are a few best practices for using it effectively:

1) Calling the join method on a task blocks the caller until the task produces a result. Therefore, it is necessary to call it after the computation of both subtasks has begun. Otherwise, you end up with a version that is slower and more complex than the original sequential algorithm, because each subtask must wait for another to complete before it can start.

2) ForkJoinPool invoke methods should not be used inside a RecursiveTask. Instead, you should always call compute or fork directly, and only sequential code should use invoke to start parallel computation.

3) A subtask can be placed into a ForkJoinPool by calling the method fork. It seems natural to call fork() on both the left and right subtasks, but this is less efficient than calling compute directly on one of them. By calling compute, you can reuse the same thread for one of the subtasks, avoiding the overhead of assigning one more task to the thread pool.

4) Debugging the parallel computing code for the branch/merge framework can be a bit tricky. Especially if you normally look at stack trace in your favorite IDE to find problems, but not in branch/merge computes, because the thread that calls Compute is not the conceptual caller, which is the one that calls fork.

5) As with parallel streams, you should not take it for granted that using a branch/merge framework on a multi-core processor is faster than sequential computation. A task can be decomposed into multiple independent sub-tasks to improve performance when parallelized. All of these subtasks should take longer to run than the new tasks take.

Job theft

Why is job theft proposed?

As in the previous example, we specify an array size of 10000L, which allows tasks to be split into an array size of 10000 for a total of 1000 tasks.

Ideally, each task should take the same amount of time to complete, so that with a multi-core CPU, we can ensure that each core takes the same amount of time to process.

In practice, the amount of time each subtask takes can vary by day, disk, network, or many other factors.

To solve this problem, the Fork/Join framework proposes the concept of work stealing.

In practice, this means that these tasks are more or less equally distributed among all threads in the ForkJoinPool. Each thread keeps a two-way chain queue of tasks assigned to it, and as each task completes, the next task is pulled from the queue head and executed.

For the reasons described earlier, a thread may have completed all of its assigned tasks early, that is, its queue is empty, while other threads are busy. Instead of being idle, the thread randomly picks another thread and “steals” a task from the tail of the queue. This process continues until all tasks have been executed and all queues have been emptied. This is why many small tasks rather than a few large ones help to balance the load better between worker threads.

In general, this job-stealing algorithm is used to redistribute and balance tasks between worker threads in the pool. The following figure illustrates this process. When a task in the worker thread queue is split into two subtasks, a subtask is “stolen” by idle worker threads. As mentioned earlier, this process can continue recursively until the condition that states that the subtasks should be executed sequentially is true.

Four, Spliterator

So how does Stream implement parallelism? We don’t need to implement Fork/join manually, which means there must be an automatic mechanism to split the Fork for you. This new automatic mechanism is called a Spliterator.

Spliterator is another new interface added in Java 8; The name stands for “splitableiterator.” Like Iterator, Spliterator is used to iterate over elements in a data source, but it is designed for parallel execution.

public interface Spliterator<T> {

  /** * The tryAdvance method behaves like a normal Iterator in that it uses the elements of the Spliterator one by one, and returns true */ if there are other elements to traverse
  boolean tryAdvance(Consumer<? super T> action);

   /** * is designed for the Spliterator interface because it can split elements * off to the second Spliterator (returned by this method) and allow them to be processed in parallel. * /
  Spliterator<T> trySplit(a);

  /** * estimateSize estimates how many elements are left to iterate over */
  long estimateSize(a);

  int characteristics(a);
}
Copy the code

4.1 Splitting Process

The algorithm for splitting a Stream into multiple parts is a recursive process. The framework keeps calling trySplit on the Spliterator until it returns null, indicating that the data structure it is dealing with can no longer be split, as described below.

1) The first step is to call trySplit on the first Spliterator to generate the second Spliterator.

2) The second step is to call trysplit on both spliterators, so there are four spliterators in total.

3) In the third step, call trysplit on all the current spliterators. When all trysplits return null, the split is complete.

4.2 Spliterator features

The splitting process of the Spliterator is also influenced by its characteristics, which are specified through the characteristics() method.

The final abstract method declared by the Spliterator interface is Characteristics, which returns an int representing the encoding of the Spliterator’s own feature set.

Has the following features:

*/ Public static final int ORDERED = 0x00000010; /** * For any pair of iterated elements x and y, x.exals (y) returns false */ public static final int DISTINCT = 0x00000001; Public static final int SORTED = 0x00000004; /** * The Spliterator was built from a source of known size (e.g., Set), so estimatedSize() returns the exact value */ public static final Int SIZED = 0x00000040; Public static final int NONNULL = 0x00000100; public static final int NONNULL = 0x00000100; /** * The data source of Spliterator cannot be modified. Public static final int IMMUTABLE = 0x00000400; public static final int IMMUTABLE = 0x00000400; Public static final int CONCURRENT = 0x00001000; public static final int CONCURRENT = 0x00001000; /** * This Spliterator and all spliterators split from it are SIZED */ public static final int subsidzed = 0x00004000;Copy the code

Here to do a simple understanding of good, limited to space temporarily not in-depth.


Students see this, feel to learn a little bit of knowledge to praise it ~~~