Ask questions
- How do I convert stream types and how do I do that?
- How do you split tasks?
- How are the results of tasks combined?
Task segmentation
The underlying execution of parallel flows is based on the ForkJoin framework. As you know, the logic of ForkJoin tasks needs to be overwritten by the user, including how to split tasks. See how the subclass of ForkJoinTask implements Compute() to see how to split tasks. The core method is implemented in the AbstractTask class.The core method is the compute() method, which defines the logic of the algorithm:
/** * Determines whether to split a task further or evaluate it directly. * If computed directly, call doLeaf and pass the result to setRawResult. * Otherwise, dismantle the molecular task, fork one, and continue as the other. * The method is structured to conserve resources over a range of uses. The loop continues with one of its subtasks as it splits, * to avoid deep recursion. To deal with splitters that might systematically favor left-heavy or right-heavy splitting, we alternate in the loop which subtasks are forked or continued. * * /
@Override
public void compute(a) {
Spliterator<P_IN> rs = spliterator, ls; // right, left spliterators
// Estimate the number of elements in rs
long sizeEstimate = rs.estimateSize();
// Get the size threshold
long sizeThreshold = getTargetSize(sizeEstimate);
//
boolean forkRight = false;
@SuppressWarnings("unchecked") K task = (K) this;
// Determine the condition
// 1. Number of elements > threshold
// 2. Rs can be cut
while(sizeEstimate > sizeThreshold && (ls = rs.trySplit()) ! =null) {
K leftChild, rightChild, taskToFork;
// Construct two new tasks using ls, RS
task.leftChild = leftChild = task.makeChild(ls);
task.rightChild = rightChild = task.makeChild(rs);
// Set the wait number to 1
task.setPendingCount(1);
// Take turns performing left/right tasks
if (forkRight) {
forkRight = false;
rs = ls;
task = leftChild;
taskToFork = rightChild;// rightChild.fork()
}
else {
forkRight = true;
task = rightChild;
taskToFork = leftChild;//leftChild.fork()
}
/ / recursion
taskToFork.fork();
sizeEstimate = rs.estimateSize();
}
task.setLocalResult(task.doLeaf());
task.tryComplete();
}
Copy the code
Split a Spliterator into two parts, then split the two parts again until the while() condition can no longer be satisfied, then perform an operation on that part and store the result in the LocalResult field of the node. This part is the segmentation phase of the branch.
results
Test code:
long count = Stream.of(1.2.3.4.5).parallel().reduce((x, y) -> x + y + 10).get();
Copy the code
AbstractPipeline. The evaluate () method is a parallel flow and the branch point of serial stream:
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape(a) == terminalOp.inputShape();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) // Parallel stream execution point
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
Copy the code
ReduceOps. EvaluateParallel () method is one of its implementation
@Override
public <P_IN> R evaluateParallel(PipelineHelper
helper, Spliterator
spliterator)
{
return new ReduceTask<>(this, helper, spliterator).invoke().get();
}
Copy the code
Two main things were done:
- Create a ReduceTask task
- The task calls invoke() to execute
The created logic does no additional operations except to assign three parameters to the instance variable.
Invoke () is a ForkJoinTask method, which focuses on the logic of the invoke() method:
/**
* Commences performing this task, awaits its completion if
* necessary, and returns its result, or throws an (unchecked)
* {@code RuntimeException} or {@code Error} if the underlying
* computation did so.
*
* @return the computed result
*/
public final V invoke(a) {
int s;
// Execute the task
if((s = doInvoke() & DONE_MASK) ! = NORMAL) reportException(s);// Return the final result
return getRawResult();
}
Copy the code
/**
* Implementation for invoke, quietlyInvoke.
*
* @return status upon completion
*/
private int doInvoke(a) {
int s; Thread t; ForkJoinWorkerThread wt;
return (s = doExec()) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(wt = (ForkJoinWorkerThread)t).pool.
awaitJoin(wt.workQueue, this.0L) : externalAwaitDone(); } ==> JDK code sacrifices readability for performance.private int doInvoke(a) {
s = doExec();
if (s < 0) return s;
t = Thread.currentThread();
if (t instanceof ForkJoinWorkerThread) {
ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t.pool();
return wt.awaitJoin(wt.workQueue,this.0L);
}
return externalAwaitDone();
}
Copy the code
/**
* If the pending count is nonzero, decrements the count;
* otherwise invokes {@link #onCompletion(CountedCompleter)}
* and then similarly tries to complete this task's completer,
* if one exists, else marks this task as complete.
*/
public final void tryComplte() {
CountedCompleter<?> a = this, s = a;
for (int c;;) {
if ((c = a.pending) == 0) {
a.onCompletion(s);
if ((a = (s = a).completer) == null) {
s.quietlyComplete();
return;
}
}
else if (U.compareAndSwapInt(a, PENDING, c, c - 1))
return;
}
}
Copy the code
In the rewrite method of reduceops.oncompletion (), combine the results of the two child nodes if they are not leaf nodes:
@Override public void onCompletion(CountedCompleter<? > caller) { if (! IsLeaf ()) {// If it is not a leaf, merge the results of the left and right child nodes. S leftResult = leftChild.getLocalResult(); leftResult.combine(rightChild.getLocalResult()); setLocalResult(leftResult); } // GC spliterator, left and right child super.onCompletion(caller); }Copy the code
Pay attention to three methods:
- isLeaf()
- combine()
- super.onCompletion()
IsLeaf () checks whether a leaf node is a leaf node. Only a non-leaf node has two child nodes, and then the merge task can be performed:
protected boolean isLeaf(a) {
return leftChild == null;
}
Copy the code
The node calls the super.onCompletionc(Caller) method after execution, with the field set to null, for GC
@Override
public void onCompletion(CountedCompleter
caller) {
spliterator = null;
leftChild = rightChild = null;
}
Copy the code
End points look at the Combine () method, which combines results
@Override
public void combine(ReducingSink other) {
if(! other.empty) accept(other.state); }// Perform an operation on the state and parameter t of this class, since t is the result of another partial operation
// The function here is the two parts of the operation to find the result
@Override
public void accept(T t) {
if (empty) {
empty = false;
state = t;
} else{ state = operator.apply(state, t); }}Copy the code
The logic for apply(state,t) operations is user-defined, corresponding to our example of.reduce((x, y) -> x + y + 10)
In this way, the results of all nodes can be pairwise combined to get the final result.
Question answer
- How do I convert stream types and how do I do that?
- You can use
sequential()
orparallel()
Conversion stream type, source code is rightsourceStage.parallel = true;
Mark, and finally determine serial or parallel at startup based on the and tag.
- You can use
- How do you split tasks?
- The underlying ForkJoindPool framework implements the partitioning and merging of tasks, rewritten
compute()
, split Splitertor and call it recursivelycompute()
Method continues to shard until Splitertor can no longer shard, then executes that part.
- The underlying ForkJoindPool framework implements the partitioning and merging of tasks, rewritten
- How are the results of tasks combined?
- rewrite
onCompletionc()
For each non-leaf node, the results of two child nodes are passedcombine()
Method, and then keep going up until all the nodes are combined, and finally return the root node.
- rewrite