The class diagram
Concept to explain
Pipline and Stage
A Pipline is an assembly line, representing an entire process. Stage represents one of the stages of the pipeline. “Stage” is an abstract description, because stage mainly represents a logical sequential relationship, and Sink is used to describe what to do and how to do in each stage.
new stream //stage 0
.filter() //stage 1
.sort() //stage 2
Copy the code
Sink
Literally, the sink, the sink in life is nothing more than
- Turn on the tap and know water is coming
- The water is in the sink and can be manipulated
- Open the sluice and let out the water
The core functions of Sink in Java are:
- Begin (): Tells the sink that water is coming and that some initialization can be done
- Accept () : Accepts the flow of water, and then operates
- End () : The water flow is completely treated.
Looking at an example of sort(), the purpose of the sort stage is to sort all the streams before they flow downstream.
private static final class SizedRefSortingSink<T> extends AbstractRefSortingSink<T> {
private T[] array; // To sort, you need an array to cache
private int offset;
SizedRefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
super(sink, comparator);
}
@Override
@SuppressWarnings("unchecked")
public void begin(long size) {
if (size >= Nodes.MAX_ARRAY_SIZE)
throw new IllegalArgumentException(Nodes.BAD_SIZE);
Begin () is called upstream to tell sort to initialize and produce an array
array = (T[]) new Object[(int) size];
}
// Upstream calls end() to tell sort that all the water has passed through. Sort starts to perform the operation
@Override
public void end(a) {
/ / operation
Arrays.sort(array, 0, offset, comparator);
// Tell downstream of SORT to prepare to receive water
downstream.begin(offset);
// Each element is passed downstream
if(! cancellationWasRequested) {for (int i = 0; i < offset; i++)
downstream.accept(array[i]);
}
else {
for (int i = 0; i < offset && ! downstream.cancellationRequested(); i++) downstream.accept(array[i]); }// Tell downstream that the flow is over
downstream.end();
// The cache is cleared
array = null;
}
// Upstream calls accept() to store the stream into sort's cache array
@Override
public void accept(T t) { array[offset++] = t; }}Copy the code
Create a Head
doubt
- Stream does not store data, so where does it store data?
The solution comes later.
use
You can create a Stream using stream.of (), for example
// Create of()
Stream<Integer> stream = Stream.of(1.2.3);
Copy the code
Source code analysis
The of() method call
StreamSupport.stream(Arrays.spliterator(arr, 0, arr.length), false);
Copy the code
Stream () method logic:
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
Objects.requireNonNull(spliterator);
return new ReferencePipeline.Head<>(spliterator,
StreamOpFlag.fromCharacteristics(spliterator),
parallel);
}
Copy the code
ReferencePipeline.Head<> is called, returning a Head object. Head is a subclass of ReferencePipeline. Head is the first stage of the assembly line. The main logic of the constructor goes all the way from super() to the AbstractPipeline class
/**
* The source spliterator. Only valid for the head pipeline.
* Before the pipeline is consumed if non-null then {@code sourceSupplier}
* must be null. After the pipeline is consumed if non-null then is set to
* null.
*/
privateSpliterator<? > sourceSpliterator;/**
* Constructor for the head of a stream pipeline.
*
* @param source {@code Spliterator} describing the stream source
* @param sourceFlags the source flags for the stream source, described in
* {@link StreamOpFlag}
* @param parallel {@code true} if the pipeline is parallel
*/AbstractPipeline(Spliterator<? > source,int sourceFlags, boolean parallel) {
this.previousStage = null;
// Use a field to point to the data set as a Spliterator
this.sourceSpliterator = source;
this.sourceStage = this;
this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
// The following is an optimization of:
// StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
this.depth = 0;
this.parallel = parallel;
}
Copy the code
Question answer
- Stream does not store data, so where does it store data?
- The Spliterator object in the Head that holds the data source and then manipulates the data as a Spliterator
In the middle of operation
A few questions
- How are the intermediate operations related?
- How do I finish an intermediate operation and then do the next one?
- How do stateful intermediate operations preserve state?
- How is lazy loading implemented
use
Stream<Integer> st = headStream.filter(e-> e=1).distinct().sort();
/ / is equivalent toStream<Integer> afterFilter = headStream.filter(e -> e ! =2);
Stream<Integer> afterDistinct = afterFilter.distinct();
Stream<Integer> afterSort = afterDistinct.sort();
Copy the code
Filter
What happens when you execute filter(op)?
Stream<Integer> afterFilter = head.filter(e -> e = 1);
Copy the code
The filter() method is defined in the Stream class and implemented in the ReferencePipeline class.
//ReferencePipeline.class
@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
Objects.requireNonNull(predicate);
Return a StatelessOp class
// The constructor takes (this,)
return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
if(predicate.test(u)) downstream.accept(u); }}; }}; }Copy the code
Return a StatelessOp class (because filter is a stateless operation). Look at the StatelessOp class, which is a static abstract inner class that inherits from the ReferencePipeline class.
//ReferencePipeline.class /** * Base class for a stateless intermediate stage of a Stream. * * @param <E_IN> type of Elements in the upstream source * @param <E_OUT> type of elements in produced by this stage * @since 1.8 */ abstract static class StatelessOp<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> { /** * Construct a new Stream by appending a stateless intermediate * operation to an existing stream. * * @param upstream The upstream pipeline stage * @param inputShape The stream shape for the upstream pipeline stage * @param opFlags Operation flags for the new stage */ StatelessOp(AbstractPipeline<? , E_IN, ? > upstream, StreamShape inputShape, int opFlags) { super(upstream, opFlags); assert upstream.getOutputShape() == inputShape; } @Override final boolean opIsStateful() { return false; }}Copy the code
In the middle, super() executes the Constructor of the AbstractPipeline class, connecting the relationships between stages
//AbstractPipeline.class /** * Constructor for appending an intermediate operation stage onto an * existing pipeline. * * @param previousStage the upstream pipeline stage * @param opFlags the operation flags for the new stage, described in * {@link StreamOpFlag} */ AbstractPipeline(AbstractPipeline<? , E_IN, ? > previousStage, int opFlags) { if (previousStage.linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); previousStage.linkedOrConsumed = true; previousStage.nextStage = this; this.previousStage = previousStage; this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK; this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags); this.sourceStage = previousStage.sourceStage; if (opIsStateful()) sourceStage.sourceAnyStateful = true; this.depth = previousStage.depth + 1; }Copy the code
Distinct
The sample
Stream<Integer> afterDistinct = afterFilter.distinct();
Copy the code
The distinct methods are implemented under the ReferencePipeline class
@Override
public final Stream<P_OUT> distinct(a) {
return DistinctOps.makeRef(this);
}
Copy the code
Call the makeRef() method of the DistinctOps class, return a StatefulOp class, and rewrite the four methods to implement the logic in opWrapSink() :
/**
* Appends a "distinct" operation to the provided stream, and returns the
* new stream.
*
* @param <T> the type of both input and output elements
* @param upstream a reference stream with element type T
* @return the new stream
*/
static <T> ReferencePipeline<T, T> makeRef(AbstractPipeline
upstream) {
// Return a StatefulOp class
return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE,
StreamOpFlag.IS_DISTINCT | StreamOpFlag.NOT_SIZED) {
// Rewrite the following methods, omit...
<P_IN> Node<T> reduce(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {... }@Override
<P_IN> Node<T> opEvaluateParallel(PipelineHelper
helper, Spliterator
spliterator, IntFunction
generator)
[]>
{... }@Override
<P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {... }@Override
Sink<T> opWrapSink(int flags, Sink<T> sink) {
Objects.requireNonNull(sink);
if (StreamOpFlag.DISTINCT.isKnown(flags)) {
...
} else if (StreamOpFlag.SORTED.isKnown(flags)) {
...
} else {
// Return a SinkChainedReference class
return new Sink.ChainedReference<T, T>(sink) {
// Use a Set to cache data for de-duplication
Set<T> seen;
Set is initialized when upstream notifies BEGIN
@Override
public void begin(long size) {
seen = new HashSet<>();
downstream.begin(-1);
}
/ / a little
@Override
public void end(a) {
seen = null;
downstream.end();
}
// If it already exists, discard it
@Override
public void accept(T t) {
if(! seen.contains(t)) { seen.add(t); downstream.accept(t); }}}; }}}; }Copy the code
The StatefulOp class is similar to the StatelessOp class in that it inherits the ReferencePipeline class, and the intermediate super() page executes the AbstractPipeline class constructor to connect the relationships between stages
/**
* Base class for a stateful intermediate stage of a Stream.
*
* @param <E_IN> type of elements in the upstream source
* @param <E_OUT> type of elements in produced by this stage
* @since1.8 * /
abstract static class StatefulOp<E_IN.E_OUT> extends ReferencePipeline<E_IN.E_OUT> {
/ / to omit
}
Copy the code
As for other intermediate operations, the routines are similar. The operation logic is encapsulated in the opWrapSink() method, which can be viewed slowly.
Question answer
- How are the intermediate operations related?
- Operation after operation is encapsulated into operation after operation
statelessOp
orstateFulOp
Object in a bidirectional linked list.
- Operation after operation is encapsulated into operation after operation
- How do I finish an intermediate operation and then do the next one?
- Sink class is responsible for undertaking upstream and downstream tasks and executing operations of assembly line operation. The core methods are Begain (), Accept () and end().
- How do stateful intermediate operations preserve state?
- Stateful intermediate operations are encapsulated as
stateFulOp
Objects, each with its own logical, concrete referencesort()
Implementation logic.
- Stateful intermediate operations are encapsulated as
- How is lazy loading implemented
- After each intermediate operation is invoked, append simply stores the association at the end of the process.
- The pipelining operation is started by the wrapAndCopyInto() method calling the Head Sink() operation, and both wrapAndCopyInto() methods need to be triggered by the finalization operation.
Put an end to the operation
A few questions
- How do finalizing methods operate?
- How do terminations trigger the flow?
- How to ensure that only one finalizing method can be executed at a time for a stream?
use
Here are four types of finalization operations, also in the API provided by Stream:
// Count the number of elements in the stream, FindOP
long count = afterLimit.count();
// Iterate over all elements,ForEachOp
afterLimit.forEach(System.out::printl);
// Get the first element,MatchOp
Optional<Integer> any = afterLimit.findFirst();
// Determine whether to ReduceOp
boolean flag = afterLimit.anyMatch(i -> i == 1);
Copy the code
count()
Implemented in the ReferencePipeline class
@Override
public final long count(a) {
// Call mapToLong to make all elements 1, and then calculate sum
return mapToLong(e -> 1L).sum();
}
Copy the code
maoToLong()
The mapToLong() method is an intermediate operation
@Override
public final LongStream mapToLong(ToLongFunction<? super P_OUT> mapper) {
Objects.requireNonNull(mapper);
return new LongPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) {
return new Sink.ChainedReference<P_OUT, Long>(sink) {
@Override
public void accept(P_OUT u) {
//downstream.accept(mapper.applyAsLong(u)); }}; }}; }Copy the code
ToLongFunction is a functional interface class. Accept () is e -> 1L.
@FunctionalInterface
public interface ToLongFunction<T> {
/**
* Applies this function to the given argument.
*
* @param value the function argument
* @return the function result
*/
long applyAsLong(T value);
}
Copy the code
Looking at the Sum() method, in the LongPipeline class, the passing argument is a Long:: Sum, which adds two values.
@Override
public final long sum(a) {
// use better algorithm to compensate for intermediate overflow?
return reduce(0, Long::sum);
}
//public static long sum(long a, long b) {
// return a + b;
/ /}
Copy the code
reduce()
The reduce method encapsulates the operation function op into a Sink, and the function of makeLong() is to produce a Sink
@Override
public final long reduce(long identity, LongBinaryOperator op) {
return evaluate(ReduceOps.makeLong(identity, op));
}
Copy the code
/**
* Constructs a {@code TerminalOp} that implements a functional reduce on
* {@code long} values.
*
* @param identity the identity for the combining function
* @param operator the combining function
* @return a {@code TerminalOp} implementing the reduction
*/
public static TerminalOp<Long, Long>
makeLong(long identity, LongBinaryOperator operator) {
Objects.requireNonNull(operator);
class ReducingSink
implements AccumulatingSink<Long.Long.ReducingSink>, Sink.OfLong {
//state is a value used as a record
private long state;
@Override
public void begin(long size) {
state = identity;
}
// The argument is sun(), so accept() is used to add state
@Override
public void accept(long t) {
state = operator.applyAsLong(state, t);
}
@Override
public Long get(a) {
return state;
}
@Override
public void combine(ReducingSink other) { accept(other.state); }}return new ReduceOp<Long, Long, ReducingSink>(StreamShape.LONG_VALUE) {
@Override
public ReducingSink makeSink(a) {
return newReducingSink(); }}; }Copy the code
evaluate()
Look back at the evaluate() method, which is used to perform the terminating operation
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape(a) == terminalOp.inputShape();
// Determine whether the stream is already in use
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
// Set the usage flag to true
linkedOrConsumed = true;
// Perform the corresponding inference operation based on the flow type
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
Copy the code
Focusing on the inference method of sequence flow, we can see that the implementation of this method can be divided into four types, corresponding to the four types of operations mentioned above. Count belongs to ReduceOp, go in and have a look.
@Override
public <P_IN> R evaluateSequential(PipelineHelper
helper, Spliterator
spliterator)
{
// Call wrapAndCopyInto()
return helper.wrapAndCopyInto(makeSink(), spliterator).get();
}
Copy the code
wrapAndCopyInto()
Ensure all stage -> sink lists, and then execute the copyInto() method
@Override
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
return sink;
}
Copy the code
warpSink()
It is here, from back to front, that all the stages are packaged to form a sink list. In this way, the linked list structure of each stage before is packaged into each Sink.
@Override
@SuppressWarnings("unchecked")
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
Objects.requireNonNull(sink);
// traverse from back to front
for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
// Execute each opWrapSink() method, which is overridden in each action class
sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
}
// return header sink
return (Sink<P_IN>) sink;
}
Copy the code
copyInto()
This method is the starting switch of the whole assembly line, and the process is as follows:
- Call the first sink’s begin() method
- Perform data source spliterator. ForEachRemaining traversal (wrappendSink) method calls the accept () method
- End () Indicates the end of the notification
@Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
Objects.requireNonNull(wrappedSink);
if(! StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {// Notify the first sink to be ready to receive the flow
wrappedSink.begin(spliterator.getExactSizeIfKnown());
/ / execution
spliterator.forEachRemaining(wrappedSink);
wrappedSink.end();
}
else{ copyIntoWithCancel(wrappedSink, spliterator); }}Copy the code
forEachRemaining()
The method forEachRemaining() is implemented in each container, and in the ArrayList:
public boolean tryAdvance(Consumer<? super E> action) {
if (action == null)
throw new NullPointerException();
int hi = getFence(), i = index;
if (i < hi) {
index = i + 1;
@SuppressWarnings("unchecked") E e = (E)list.elementData[i];
// Execute the accept() method
action.accept(e);
if(list.modCount ! = expectedModCount)throw new ConcurrentModificationException();
return true;
}
return false;
}
Copy the code
Other terminating operations
forEach()
In ReferencePipeline class, the forEach() method is implemented,
// from ReferencePipeline.class
@Override
public void forEach(Consumer<? super P_OUT> action) {
// ForEachOps..
evaluate(ForEachOps.makeRef(action, false));
}
Copy the code
Evaluate is the same as count.
findFirst() anyMatch()
The logic of findFirst() and anyMatch() is no more verbose
@Override
public final Optional<P_OUT> findFirst(a) {
return evaluate(FindOps.makeRef(true));
}
@Override
public final boolean anyMatch(Predicate<? super P_OUT> predicate) {
return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ANY));
}
Copy the code
Question answer
- How do finalizing methods operate?
- The evaluate() method is called in the implementation of the terminating operation, which warps everything into a sink and then executes the begin(),accept(), and end() methods from scratch
- How do terminations trigger the flow?
- The trigger switch is wrapAndCopyInto(), which is only called during the finalization operation.
- How to ensure that only one finalizing method can be executed at a time for a stream?
- After executing the evaluate() method once
linkedOrConsumed
Set it to true, and the evaluate() method will fail later.
- After executing the evaluate() method once
Reference:
- Java 8 Stream Explorer (Colobu.com)
- Java Stream implementation (https_toutiao.io)