This article has participated in the good article call order activity, click to see: back end, big front end double track submission, 20,000 yuan prize pool for you to challenge!
>>>> 😜😜😜 Github: 👉 github.com/black-ant CASE backup: 👉 gitee.com/antblack/ca…
A. The preface
This article takes a step-by-step approach to functional interfaces and discusses the principles and benefits of this feature in Java 8
Before functional programming has been based on use, recently combing the source code, found that this concept is really everywhere, simply a complete comb processing
The basic use of Java Stream can see this article: Operation Manual: Stream Processing manual
This paper will be divided into two main bodies:
- Functional programming in Java
- Java Stream principle
Ii. Principles of functional programming
Functional programming has four main interfaces: Consumer, Supplier, Predicate, and Function, each of which has a separate abstract method
Note: Functional interfaces are annotated with the @functionalInterface annotation, which can only be annotated on interfaces that have only one abstract method.
// In the example of Consumer, you can see that the only abstract method is Accept
@FunctionalInterface
public interface Consumer<T> {
void accept(T t);
default Consumer<T> andThen(Consumer<? super T> after) {
Objects.requireNonNull(after);
return(T t) -> { accept(t); after.accept(t); }; }}Copy the code
Here are a few concepts:
- There can only be one abstract method in a functional programming interface
- There can be static and default methods (PS: not abstract methods)
- You can override the Object method (PS: function itself inherits from Object, but we’ll see what happens later).
- Comments not required
2.1 Use of functional programming
Let’s define a functional programming process and see the details:
2.1.1 Arrow function Demo
// Look at a simple use of functional programming:
// Step 1: Customize the functional interface
@FunctionalInterface
public interface FunctionInterface<T.D> {
/** * function interface abstract method, the final execution method * PS: so generic */
T invoke(D input);
/** * Object, which does not violate the rules of functional programming **@param var1
* @return* /
boolean equals(Object var1);
/** * default is not abstract, does not violate the principle */
default void defaultMethod(a) {}/**
* static不是抽象方法
*/
static void staticMethod(String msg) { System.out.println(msg); }}// Step 2: Define the calling function
public void testFunction(a) {
logger.info("------> [execute functional methods] <-------");
// Method 1: pass in the code block
invokeInterface((input) -> {
return input + "-output1";
});
// Method 2: direct access
invokeInterface((input) -> input + "-output2");
// Method 3: pass in the object
FunctionInterface<String, String> funtion = (input) -> input + "-output3";
invokeInterface(funtion);
}
public void invokeInterface(FunctionInterface<String, String> funtion) {
String response = funtion.invoke("test");
logger.info("------> this is output :{} <-------", response);
}
// Print the result
FunctionService : ------> this is output :test-output1 <-------
FunctionService : ------> this is output :test-output2 <-------
FunctionService : ------> this is output :test-output3 <-------
Copy the code
2.1.2 Double colon function Demo
public void testDoubleFunction(a) {
// Example 1: pass in an instance method
Consumer consumer = System.out::println;
invokeDoubleFunction(consumer);
// Example 2: passing static methods
Consumer<String> consumer2 = FunctionInterface::staticMethod;
invokeDoubleFunction(consumer2);
// Example 3: passing a superclass
Consumer<String> consumer3 = super::superMethod;
invokeDoubleFunction(consumer3);
// Example 4: Pass the constructor
Consumer<ArrayList> consumer4 = ArrayList::new;
invokeDoubleFunction2(consumer4);
// For common cases, pass in custom methods
Consumer<String> createRoot = (msg) -> {
logger.error("E----> error :{} -- content :{}", e.getClass(), e.getMessage());
};
invokeDoubleFunction2(createRoot);;
}
public void invokeDoubleFunction(Consumer consumer) {
Stream<String> stream = Stream.of("aaa"."bbb"."ddd"."ccc"."fff");
stream.forEach(consumer);
}
public void invokeDoubleFunction2(Consumer consumer) {
Stream<Collection<String>> stream = Stream.of(Arrays.asList("aaa"."bbb"));
stream.forEach(consumer);
}
Copy the code
2.2 Related annotations/interfaces
Having seen the custom ways of functional programming above, here is the related interface 👉
There are four main functional interfaces known to Java: Consumer, Supplier, Predicate, Function, and others like it: IntConsumer, IntSupplier, UnaryOperator, etc. Here we just need to do a simple analysis of the four main ones
Consumer: Consumer interface
Understanding: The purpose of this interface function is to consume the parameters passed in. The main focus is on the use of the parameters.
public interface Consumer<T> {
void accept(T t);
}
Copy the code
Supplier: Supplier
Understanding: Supplier’s role scenario is primarily a simple resource structure: no input, only return
public interface Supplier<T> {
T get(a);
}
Copy the code
Predicate: Predicate type
An assertion that represents a parameter structure: an object is passed in and returns a Boolean
public interface Predicate<T> {
boolean test(T t);
}
/ / usage:
stream.filter(predicate).collect(Collectors.toList());
Copy the code
Function: Functional
Understanding: Functions that take a parameter and produce a result are also the most functional
public interface Function<T.R> {
R apply(T t);
}
/ / usage:
Function<String, Integer> function = new Function<String, Integer>() {
@Override
public Integer apply(String s) {
return s.length();// Get the length of each string and return}}; Stream<Integer> stream1 = stream.map(function);Copy the code
Differences between Consumer and IntConsumer similar interfaces, and optimization thinking
As you can see, there are some basic types of interfaces that are manually extended for each of these function interfaces, such as IntConsumer, and so on
From the previous data, we know that the purpose of this is to optimize the basic type, but there is no direct difference when looking at the source code:
On a personal note, generics are mostly handled by the compiler and have little impact in the actual use phase, and the wrapper functionality of the Java basic types is not really optimized, so what is being optimized here? Or is it just for clarity?
// PS: I don’t believe the JDK does this, so there must be room!!
Think of the fact that the actual type is determined before the compiler, so there must be some intuitive place in the business code to handle the type, rather than using things like reflection to handle the type at runtime.
TODO: That was interesting, but we’ll see…
2.2 Method function principle
Now that we’ve talked about functional programming, let’s take a look at how functional programming works:
Use of the arrow function:
// In the custom case, what is passed in is actually an interface objectMethod functions are the concept of lambda, which is based on how they are used at the Class level.// TODO: decompile is not necessary
Copy the code
3. Steam depth
3.1 Stream architecture
As you can see from the diagram, the architecture is basically the same, like a tree structure:
The second layer is an abstraction level that contains five classes: AbstractPipeline/Stream/IntStream/DoubleStream/LongStream The fourth layer of the DoublePipeline/LongPipeline/IntPipeline/ReferencePipeline is an inner class, and each implementation has several: Head / StatelessOp / StatefulOp / OfInt
3.2 Operating principle of Stream
As you can see, the Stream structure is primarily based on the concept of Pipeline, with three additional optimizations for the basic types.
At the same time, StatefulOp and StatelessOp are used to make a simple conceptual arrangement for the intermediate operations of due and stateless states:
Process Diagram (a diagram to show you the main process)
Pre-added: AbstractPipeline class
AbstractPipeline 的作用 :
Stage is a virtual concept. AbstractPipeline represents the initial part of a stream pipeline, encapsulating the stream source and zero or more intermediate operations. An AbstractPipeline is regarded as a stage, where each stage describes either the stream source or the intermediate operations.
Stage properties:
There are three stage concepts in AbstractPipeline (used to annotate spatial structures)
F-abstractpipeline sourceStage: points to the head of the pipeline chain (self if this is the sourceStage) “Upstream” pipeline, empty if this is the source level f-abstractpipeline nextStage: the nextStage in the pipeline, empty if this is the last stage
The above three attributes are used to annotate the spatial structure, meaning where does the process go, and the rest is how to annotate the behavior
Type of behavior:
The properties Head, StatefulOp, and StatelessOp all inherit from AbstractPipeline, and they identify three types of operations
Head: indicates the first Stage, the source Stage, which collects resources. StatefulOp: stateless operations StatelessOp: stateless operations
The usual execution logic is Head -> StatefulOp -> StatelessOp
Other attributes:
// Upstream pipeline, null when the first stream is created
previousStage = null;
// Source cutters, implemented in each tool class
sourceSpliterator = source;
// The operation flag of the intermediate operation represented in this pipe object.
sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
// Merge source and operation flags for all operations of the source and the operation represented by the pipe object
combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
/** * Non-parallel: with the depth of the source stream (number of steps) * Parallel: represents the preceding stateful operand **/
depth = 0;
/** * Whether the pipeline is parallel **/
parallel = parallel;
Copy the code
AbstractPipeline constructor
AbstractPipeline(Supplier<? extends Spliterator<? >> source,int sourceFlags, boolean parallel) {
this.previousStage = null; / / upper stage
this.sourceSupplier = source;
this.sourceStage = this;
this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
this.depth = 0;
this.parallel = parallel;
}
Copy the code
3.2.1 Stream process
The front points
Here are all the key points of Stream operation learned from related blogs for the rest of the article:
/ / main process- each operation will create a stage - upstream sink sink through AbstractPipeline. To find out the downstream opWrapSink sink// End the process- The end operation will not create a new pipeline Stage. - The end operation will wrap a Sink operated by itself as the last Sink of the pipeline// The collection process- For Boolean and Optional operations, it will be recorded in the corresponding Sink. - For reduction operations, the final result will be placed in the container specified by the user when calling. - The return array will be first placed in a Node data structure// Operate in parallel- Performs tasks by ForkJoin in parallelCopy the code
Use case
// Follow the following example to see the main process:
List<String> randomValues = Arrays.asList(
"E11"."D12"."A13"."F14"."C15"."A16"."B11"."B12"."C13"."B14"."B15"."B16"."F12"."E13"."C11"."C14"."A15"."C16"."F11"."C12"."D13"."E14"."D15"."D16"
);
randomValues.stream().filter(value -> value.startsWith("C")).sorted().forEach(System.out::println);
Copy the code
3.2.1.1 Step 1 :The creation of a Stream
It’s usually possible to create a Stream from a Collection or an Array. We won’t go into too much detail here, but what does it look like
- Spliterator to split collections (methods of collections or arrays themselves)
- Build a stream from streamSupport. stream
// As you can see, a Stream is built using ReferencePipeline.Head
return new ReferencePipeline.Head<>(spliterator,StreamOpFlag.fromCharacteristics(spliterator),parallel);
// 补充 spliterator :Spliterator has many implementations. ArrayListSpliterator has three main properties:private final ArrayList<E> list; // Pass the current Stream collection
private int index; // The current index is modified as it grows and cuts
private int fence; // The current number of collections. When -1 indicates that all collections are finished
private int expectedModCount; // Initialization when fence is set
// Complement StreamOpFlag: this object converts Spliterator into a stream flag that identifies the characteristics of the stream.
public static final int ORDERED = 0x00000010; // Define the meeting order
public static final int DISTINCT = 0x00000001; // Define unique features
public static final int SORTED = 0x00000004; // The characteristic values that follow the defined sort order are comparable
public static final int SIZED = 0x00000040; // Represents an exact count of the number of elements that will be encountered during a full walk
public static final int NONNULL = 0x00000100; // Represents the eigenvalue where the source guarantees that the element encountered is not empty
public static final int IMMUTABLE = 0x00000400; // The element source is an eigenvalue that cannot be modified by the structure
public static final int CONCURRENT = 0x00001000; // Indicates that the element source can be safely modified concurrently by multiple threads (allowing addition, replacement, and/or removal) without the need for external synchronization
public static final int SUBSIZED = 0x00004000; // All subspliterators, whether direct or indirect, will be counted (subclass count)
// parallel: whether the returned stream is parallel
Copy the code
3.2.1.2 Step 2 :The Filter to Filter
We saw the Stream creation process earlier:
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
Objects.requireNonNull(predicate);
return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
// Check with the passed method function
if (predicate.test(u))
// Verify that the output stream was added successfully (PS: we will talk about the List after it was added)downstream.accept(u); }}; }}; }StreamShape is an enumeration that describes the type characteristics of a stream abstraction. It has four properties
REFERENCE : object
INT_VALUE
LONG_VALUE
DOUBLE_VALUE
Copy the code
What does this code focus on?
- A StatelessOp is built
- Sink.ChainedReference is a reverse callback logic that will only be executed when the stream foreach
Supplement: Core operation wrapSink
/ / function:
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
Objects.requireNonNull(sink);
for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
}
return (Sink<P_IN>) sink;
}
Copy the code
Focus on this link:
What is a pipe? A pipe is a line of water that goes in at one end and out at the other
The filter is equivalent to the shunt valve, which can get some water out, but all the premise is that the switch of the pipeline should be opened
That is, when the process reaches a Terminal operation such as foreach, the flow starts to run and the intermediate operations set in the process are executed
3.2.1.3 Step 3 :The forEach process
The forEach entrance
C- ReferencePipeline (java.util.stream.SortedOps$OfRef)
public void forEach(Consumer<? super P_OUT> action) {
evaluate(ForEachOps.makeRef(action, false));
}
// --> evaluate the actual implementation
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape(a) == terminalOp.inputShape();
// linkedOrConsumed -> This pipe has been applied or consumed, meaning the pipe can only be used once
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
// Whether the stream is parallel
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
// Additional: Parallel object distinctionEvaluateParallel: Performs parallel computation of the operation using the specified PipelineHelper. EvaluateSequential: Performs sequential computation of the operation using the specified parametersCopy the code
Internal process Step 1 :wrapSink Build Sink
public <S> Void evaluateSequential(PipelineHelper<T> helper,Spliterator<S> spliterator) {
return helper.wrapAndCopyInto(this, spliterator).get();
}
@Override
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
return sink;
}
// Add wrapSink:
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
Objects.requireNonNull(sink);
for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
}
return (Sink<P_IN>) sink;
}
public Sink<T> opWrapSink(int flags, Sink<T> sink) {
Objects.requireNonNull(sink);
// If the input is already naturally sorted and this operation
// also naturally sorted then this is a no-op
if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)
return sink;
else if (StreamOpFlag.SIZED.isKnown(flags))
return new SizedRefSortingSink<>(sink, comparator);
else
return new RefSortingSink<>(sink, comparator);
}
@Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
Objects.requireNonNull(wrappedSink);
if(! StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {// Notify that data is coming
wrappedSink.begin(spliterator.getExactSizeIfKnown());
spliterator.forEachRemaining(wrappedSink);
// End must be called after all data has been sent
wrappedSink.end();
// PS: begin and accept cannot be called again after end
}
else{ copyIntoWithCancel(wrappedSink, spliterator); }}Copy the code
Internal process Step 2: Initiate a Foreach loop
public void end(a) {
// At this point, the list has been filtered and sorted
list.sort(comparator);
Downstream is also a Sink
downstream.begin(list.size());
if(! cancellationWasRequested) {// PS: List is successfully added in the preceding Filter
list.forEach(downstream::accept);
} else {
for (T t : list) {
if (downstream.cancellationRequested()) break;
downstream.accept(t);
}
}
downstream.end();
list = null;
}
Copy the code
Internal process Step 3: Foreach main process
public void forEach(Consumer<? super E> action) {
Objects.requireNonNull(action);
final int expectedModCount = modCount;
final E[] elementData = (E[]) this.elementData;
final int size = this.size;
for (int i=0; modCount == expectedModCount && i < size; i++) {
action.accept(elementData[i]);
}
if(modCount ! = expectedModCount) {throw newConcurrentModificationException(); }}Copy the code
Internal process Step 4: Execute accept
static final class OfRef<T> extends ForEachOp<T> {
final Consumer<? super T> consumer;
OfRef(Consumer<? super T> consumer, boolean ordered) {
super(ordered);
this.consumer = consumer;
}
@Override
public void accept(T t) {
// Here we execute the function passed in foreachconsumer.accept(t); }}Copy the code
3.2.1.4 Supplementary Sink:
Function:
- Begin (Long size) : This method is called before you start traversing the elements, notifying Sink to get ready
- End () : called after all elements have been traversed to tell Sink that there are no more elements
- CancellationRequested () : Whether the operation can be finished, so that the short circuit operation can be finished as early as possible (short circuit operation must be implemented)
- Accept (T T) : Called when an element is traversed, accepting an element to be processed and processing the element (PS: Stage in the list is called by the accept underlayer)
// Call process:Each Stage will encapsulate its own operation into a Sink, and the latter Stage only needs to call the accept() method of the previous Stage, which is similar to the idea of recursion, but different recursion is from the bottom layer to the top layer, and Stream is from the top layer to the bottom layer after executing the bottom layerCopy the code
Iv. Key points
4.1 the Map process
//
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
Objects.requireNonNull(mapper);
return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
return new Sink.ChainedReference<P_OUT, R>(sink) {
@Override
public void accept(P_OUT u) {
// u is the current object. Mapper. apply is used to apply this function to a given parameterdownstream.accept(mapper.apply(u)); }}; }}; }Copy the code
4.2 the Collection process
public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
A container;
if(isParallel() && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT)) && (! isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) { container = collector.supplier().get(); BiConsumer<A, ?super P_OUT> accumulator = collector.accumulator();
forEach(u -> accumulator.accept(container, u));
}
else {
container = evaluate(ReduceOps.makeRef(collector));
}
return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
? (R) container
: collector.finisher().apply(container);
}
// The Supplier object is injected here
public static<T> Collector<T, ? , List<T>> toList() {return new CollectorImpl<>((Supplier<List<T>>) ArrayList::new, List::add,
(left, right) -> { left.addAll(right); return left; },
CH_ID);
}
// Actual here
static class CollectorImpl<T.A.R> implements Collector<T.A.R> {
private final Supplier<A> supplier;
private final BiConsumer<A, T> accumulator;
private final BinaryOperator<A> combiner;
private final Function<A, R> finisher;
private final Set<Characteristics> characteristics;
}
// InvokeSupplier when executing the ReduceOps object
public static <T, I> TerminalOp<T, I>
makeRef(Collector<? superT, I, ? > collector) {
Supplier<I> supplier = Objects.requireNonNull(collector).supplier();
BiConsumer<I, ? super T> accumulator = collector.accumulator();
BinaryOperator<I> combiner = collector.combiner();
class ReducingSink extends Box<I>
implements AccumulatingSink<T.I.ReducingSink> {
@Override
public void begin(long size) {
state = supplier.get();
}
@Override
public void accept(T t) {
accumulator.accept(state, t);
}
@Override
public void combine(ReducingSink other) { state = combiner.apply(state, other.state); }}return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) {
@Override
public ReducingSink makeSink(a) {
return new ReducingSink();
}
@Override
public int getOpFlags(a) {
return collector.characteristics().contains(Collector.Characteristics.UNORDERED)
? StreamOpFlag.NOT_ORDERED
: 0; }}; }Copy the code
4.3 Differences between stateless and stateful treatments
- Stateless: Stateless operation
- Stateful: stateful operation
abstract static class StatelessOp<E_IN.E_OUT>
extends ReferencePipeline<E_IN.E_OUT> { StatelessOp(AbstractPipeline<? , E_IN, ? > upstream, StreamShape inputShape,int opFlags) {
super(upstream, opFlags);
assert upstream.getOutputShape() == inputShape;
}
@Override
final boolean opIsStateful(a) {
return false; }}abstract static class StatefulOp<E_IN.E_OUT>
extends ReferencePipeline<E_IN.E_OUT> { StatefulOp(AbstractPipeline<? , E_IN, ? > upstream, StreamShape inputShape,int opFlags) {
super(upstream, opFlags);
assert upstream.getOutputShape() == inputShape;
}
@Override
final boolean opIsStateful(a) {
return true;
}
@Override
abstract <P_IN> Node<E_OUT> opEvaluateParallel(PipelineHelper
helper, Spliterator
spliterator, IntFunction
generator)
[]>
;
}
/ / contrast:
1StatefulOp implements opEvaluateParallel as an extraCopy the code
4.4 Parallel Processing
// We already know that evaluate is created in parallel through evaluateParallel
C- AbstractPipeline
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
/ /... Whether the stream is parallel
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
/ / principle:The parallel processing of streams is based on the ForkJoin framework, which is created differently in each Op, but each creates a Task// C- ReduceOp
public <P_IN> R evaluateParallel(PipelineHelper<T> helper,Spliterator<P_IN> spliterator) {
return new ReduceTask<>(this, helper, spliterator).invoke().get();
}
// C- MatchOp
public <S> Boolean evaluateParallel(PipelineHelper<T> helper,Spliterator<S> spliterator) {
return new MatchTask<>(this, helper, spliterator).invoke();
}
// C- ForEachOp
public <S> Void evaluateParallel(PipelineHelper<T> helper,Spliterator<S> spliterator) {
if (ordered)
new ForEachOrderedTask<>(helper, spliterator, this).invoke();
else
new ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke();
return null;
}
// C- FindOp
public <P_IN> O evaluateParallel(PipelineHelper<T> helper,Spliterator<P_IN> spliterator) {
return new FindTask<>(this, helper, spliterator).invoke();
}
Copy the code
public void compute(a) {
Spliterator<P_IN> rs = spliterator, ls; // right, left spliterators
long sizeEstimate = rs.estimateSize();
long sizeThreshold = getTargetSize(sizeEstimate);
boolean forkRight = false;
@SuppressWarnings("unchecked") K task = (K) this;
while(sizeEstimate > sizeThreshold && (ls = rs.trySplit()) ! =null) {
K leftChild, rightChild, taskToFork;
task.leftChild = leftChild = task.makeChild(ls);
task.rightChild = rightChild = task.makeChild(rs);
task.setPendingCount(1);
if (forkRight) {
forkRight = false;
rs = ls;
task = leftChild;
taskToFork = rightChild;
}
else {
forkRight = true;
task = rightChild;
taskToFork = leftChild;
}
taskToFork.fork();
sizeEstimate = rs.estimateSize();
}
task.setLocalResult(task.doLeaf());
task.tryComplete();
}
Copy the code
Here you can have a look at the senior’s article, written very clearly, the following is the handling of the picture @Java8 Stream principle in-depth analysis – Zhihu.com
4.5 Op module system
// PS: create a Sink for each Op
copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
// The core is: wrapSink, the final call after creation
Copy the code
4.6 Multiple stages
- Set by the constructor
- Depth Specifies the depth
The other structures are shown
conclusion
The principle of Stream does not go into too much depth, mainly because of some curiosity about its principle, and only one process is analyzed.
Stream source code is a pleasure to read. It is rare to see code with such an interesting structure.
How streams are processed efficiently through parallelism is not fully understood here. In the next article, we will look at performance analysis
thinking
Spend so long, comb through these source code, always want to learn something from inside, here try to do a little summary:
- The Stream process is very interesting. It is a recursion-like but slightly different structure. The underlying logic is the switch that starts the entire process, and when the water flows, it is executed from scratch
- The architecture of Stream is also interesting. It’s a bit like Node next and Pre, but with virtual stage objects
- In the inheritance system, the first feeling is neat, and very detailed, in the interface structure, very valuable reference
The appendix
Stream common methods:
reference
@deep understanding of Java8 Stream implementation principles _LCgoing blog -CSDN blog _stream principles
@Java8 Stream Principle in-depth analysis – Zhihu (zhihu.com)
@ www.javabrahman.com/java-8/unde…)