Ask questions

  1. How do I convert stream types and how do I do that?
  2. How do you split tasks?
  3. How are the results of tasks combined?

Task segmentation

The underlying execution of parallel flows is based on the ForkJoin framework. As you know, the logic of ForkJoin tasks needs to be overwritten by the user, including how to split tasks. See how the subclass of ForkJoinTask implements Compute() to see how to split tasks. The core method is implemented in the AbstractTask class.The core method is the compute() method, which defines the logic of the algorithm:

    /** * Determines whether to split a task further or evaluate it directly. * If computed directly, call doLeaf and pass the result to setRawResult. * Otherwise, dismantle the molecular task, fork one, and continue as the other. * The method is structured to conserve resources over a range of uses. The loop continues with one of its subtasks as it splits, * to avoid deep recursion. To deal with splitters that might systematically favor left-heavy or right-heavy splitting, we alternate in the loop which subtasks are forked or continued. * * /
    @Override
    public void compute(a) {
        Spliterator<P_IN> rs = spliterator, ls; // right, left spliterators
        // Estimate the number of elements in rs
        long sizeEstimate = rs.estimateSize();
        // Get the size threshold
        long sizeThreshold = getTargetSize(sizeEstimate);
        //
        boolean forkRight = false;
        @SuppressWarnings("unchecked") K task = (K) this;
        // Determine the condition
        // 1. Number of elements > threshold
        // 2. Rs can be cut
        while(sizeEstimate > sizeThreshold && (ls = rs.trySplit()) ! =null) {
            K leftChild, rightChild, taskToFork;
            // Construct two new tasks using ls, RS
            task.leftChild  = leftChild = task.makeChild(ls);
            task.rightChild = rightChild = task.makeChild(rs);
            // Set the wait number to 1
            task.setPendingCount(1);
            
            // Take turns performing left/right tasks
            if (forkRight) {
                forkRight = false;
                rs = ls;
                task = leftChild;
                taskToFork = rightChild;// rightChild.fork()
            }
            else { 
                forkRight = true;
                task = rightChild;
                taskToFork = leftChild;//leftChild.fork()
            }
            / / recursion
            taskToFork.fork();
            sizeEstimate = rs.estimateSize();
        }
        task.setLocalResult(task.doLeaf());
        task.tryComplete();
    }
Copy the code

Split a Spliterator into two parts, then split the two parts again until the while() condition can no longer be satisfied, then perform an operation on that part and store the result in the LocalResult field of the node. This part is the segmentation phase of the branch.

results

Test code:

long count = Stream.of(1.2.3.4.5).parallel().reduce((x, y) -> x + y + 10).get();
Copy the code

AbstractPipeline. The evaluate () method is a parallel flow and the branch point of serial stream:

final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
        assert getOutputShape(a) == terminalOp.inputShape();
        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        linkedOrConsumed = true;

        return isParallel()
               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))  // Parallel stream execution point
               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
    }
Copy the code

ReduceOps. EvaluateParallel () method is one of its implementation

@Override
public <P_IN> R evaluateParallel(PipelineHelper
       
         helper, Spliterator
        
          spliterator)
        
        {
    return new ReduceTask<>(this, helper, spliterator).invoke().get();
}
Copy the code

Two main things were done:

  • Create a ReduceTask task
  • The task calls invoke() to execute

The created logic does no additional operations except to assign three parameters to the instance variable.

Invoke () is a ForkJoinTask method, which focuses on the logic of the invoke() method:

/**
* Commences performing this task, awaits its completion if
* necessary, and returns its result, or throws an (unchecked)
* {@code RuntimeException} or {@code Error} if the underlying
* computation did so.
*
* @return the computed result
*/
public final V invoke(a) {
    int s;
    // Execute the task
    if((s = doInvoke() & DONE_MASK) ! = NORMAL) reportException(s);// Return the final result
    return getRawResult();
}
Copy the code
/**
* Implementation for invoke, quietlyInvoke.
*
* @return status upon completion
*/
private int doInvoke(a) {
    int s; Thread t; ForkJoinWorkerThread wt;
    return (s = doExec()) < 0 ? s :
    ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
        (wt = (ForkJoinWorkerThread)t).pool.
        awaitJoin(wt.workQueue, this.0L) : externalAwaitDone(); } ==> JDK code sacrifices readability for performance.private int doInvoke(a) {
    s = doExec();
    if (s < 0) return s;
    
	t = Thread.currentThread();
    if (t instanceof ForkJoinWorkerThread) {
    	ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t.pool();
        return wt.awaitJoin(wt.workQueue,this.0L);
    }
    
    return externalAwaitDone();
}
Copy the code
    /**
     * If the pending count is nonzero, decrements the count;
     * otherwise invokes {@link #onCompletion(CountedCompleter)}
     * and then similarly tries to complete this task's completer,
     * if one exists, else marks this task as complete.
     */
    public final void tryComplte() {
        CountedCompleter<?> a = this, s = a;
        for (int c;;) {
            if ((c = a.pending) == 0) {
                a.onCompletion(s);
                if ((a = (s = a).completer) == null) {
                    s.quietlyComplete();
                    return;
                }
            }
            else if (U.compareAndSwapInt(a, PENDING, c, c - 1))
                return;
        }
    }
Copy the code

In the rewrite method of reduceops.oncompletion (), combine the results of the two child nodes if they are not leaf nodes:

@Override public void onCompletion(CountedCompleter<? > caller) { if (! IsLeaf ()) {// If it is not a leaf, merge the results of the left and right child nodes. S leftResult = leftChild.getLocalResult(); leftResult.combine(rightChild.getLocalResult()); setLocalResult(leftResult); } // GC spliterator, left and right child super.onCompletion(caller); }Copy the code

Pay attention to three methods:

  • isLeaf()
  • combine()
  • super.onCompletion()

IsLeaf () checks whether a leaf node is a leaf node. Only a non-leaf node has two child nodes, and then the merge task can be performed:

protected boolean isLeaf(a) {
    return leftChild == null;
}
Copy the code

The node calls the super.onCompletionc(Caller) method after execution, with the field set to null, for GC

@Override
public void onCompletion(CountedCompleter
        caller) {
    spliterator = null;
    leftChild = rightChild = null;
}
Copy the code

End points look at the Combine () method, which combines results

@Override
public void combine(ReducingSink other) {
    if(! other.empty) accept(other.state); }// Perform an operation on the state and parameter t of this class, since t is the result of another partial operation
// The function here is the two parts of the operation to find the result
@Override
public void accept(T t) {
    if (empty) {
        empty = false;
        state = t;
    } else{ state = operator.apply(state, t); }}Copy the code

The logic for apply(state,t) operations is user-defined, corresponding to our example of.reduce((x, y) -> x + y + 10)

In this way, the results of all nodes can be pairwise combined to get the final result.

Question answer

  1. How do I convert stream types and how do I do that?
    • You can usesequential() orparallel() Conversion stream type, source code is rightsourceStage.parallel = true; Mark, and finally determine serial or parallel at startup based on the and tag.
  2. How do you split tasks?
    • The underlying ForkJoindPool framework implements the partitioning and merging of tasks, rewrittencompute() , split Splitertor and call it recursivelycompute()Method continues to shard until Splitertor can no longer shard, then executes that part.
  3. How are the results of tasks combined?
    • rewriteonCompletionc()For each non-leaf node, the results of two child nodes are passedcombine()Method, and then keep going up until all the nodes are combined, and finally return the root node.