Discussion on the use and principle of Stream in JDK
Jdk7 upgrade to JDK8 added a very practical function, Stream, in the actual development has a lot of use. I believe many students are familiar with Stream. Why Stream is so popular and what problems it solves, let’s discuss together.
- Author: Tao Wangwang
- Editor: Higazawa
Why does a Stream occur
Before the Stream Stream, if we wanted to iterate over a collection, we might use forEach or a for in loop. If we needed to make some judgments during the iteration, we might need multiple loops, like this:
List<Integer> list = new ArrayList<>(Arrays.asList(3.5.2.9.1.6.8.7));
// Get all elements greater than 5 from the original list, sort, iterate
List<Integer> newList = new ArrayList<>();
for (Integer i : list) {
if(i > 5){
newList.add(i);
}
}
Collections.sort(newList);
newList.forEach(System.out::println);
Copy the code
The above code may be written in the normal way without using a Stream Stream. The loop uses a total of three times, and the same function is implemented with a Stream Stream as follows:
List<Integer> list = new ArrayList<>(Arrays.asList(3.5.2.9.1.6.8.7));
list.stream().filter(integer -> integer>5).sorted().forEach(System.out::println);
Copy the code
Using a Stream plus lambda expression to do the same thing is a lot less code (and actually a lot less loop time), and the more operations you do, the more obvious the advantage of using a Stream becomes. Therefore, the emergence of Stream is mainly to simplify the operation of iteration and improve the efficiency of iteration.
Review the basic use of Stream
The Stream Stream provides very powerful functionality, so let’s review the common apis used in Stream streams.
Create a flow
Stream.of(1.2.3.4);
Arrays.asList(1.2.3.4).stream();
// Can replace the for I loop
IntStream.range(1.5);
// The following two types of streams, which I have not yet used, can generate streams of infinite length
Stream.generate(Math::random);
Stream.iterate(1, item -> item + 1);
Copy the code
The operation of the flow
The Stream Stream provides a rich set of operation types, as shown in the figure below
Here’s a quick explanation of the bold font in the table:
-
Intermediate operation: literally, it is a ring in the middle of the flow from the beginning to the end. From the operation result, the previous flow generates the next flow through the intermediate operation. From the code, it actually generates a stage and sink node
-
End operation: Contrary to the middle operation, after the end operation is the last link of the stream, no new stream is generated, and the whole sink node is started from scratch from the code
-
Stateful: An operation that requires fetching all the data in the stream, such as sort, before sorting
-
Stateless: Operations on individual elements that do not require all elements to be retrieved, such as filter, for each element itself
-
Short-circuit: Finalizing operations will return true as long as there is one element in the operation that yields the result of the operation, such as findAnyMatch, as long as one node satisfies the filter criteria
-
Non-short-circuit: you need to traverse all nodes to get the expected result, such as forEach and Max
Common operations
Here are some common flow operations that I use in my daily development
-
Filter: The filter method receives a Predicate function that determines whether the elements are filtered and returns true to continue to the next stream
-
Collect: The collect method can specify a container that is used to receive flow elements. The common ones are Collectors. ToList () and Collectors.
-
ForEach: The forEach method, similar to the for loop, is simple and practical, used for traversal
-
Sorted: The sorted method is used for sorting. You can pass in a custom comparator, the same as the collection comparator, or compare the size of the elements themselves
-
Map: The map method is used to map elements from the previous stream to the next stream, such as userstream.map (User::getName) (pseudocode) to map a User stream to a User name stream, which is very useful
-
FindAny: the findAny method is used to findAny element in the stream, which can be interpreted as whether there are any elements left in the stream after an intermediate operation
Explore the Stream principle
Source code analysis has always been a tedious and complex process, because highly engineered code such as the JDK source code uses a lot of design patterns and a high degree of interface encapsulation. We take the above code as an example to simply explore the implementation principle of a stream.
First look at the class inheritance diagram:
-
BaseStream specifies the basic interface for a stream
-
Stream defines common operations such as map, filter, and FlatMap
-
BaseStream Stream IntStream LongStream DoubleStream is the foundation of Java’s Stream architecture
-
PipelineHelper is used to construct ReferencePipeline and AbstractPipeline structures during Stream execution
-
AbstractPipeline is the core abstract class of the pipeline for building and managing the pipeline. Its implementation classes are the nodes of the pipeline
-
In the Head, StatelessOp, StatefulOp ReferencePipeline inner classes, [Int | Long | Double] Pipeline internal are also defines the three inner classes
Take ReferencePipeline as an example. From the inheritance diagram, Head, StatelessOp and StatefulOp internal classes in ReferencePipeline inherit ReferencePipeline. ReferencePipeline itself inherits AbstractPipeline, so Head, StatelessOp and StatefulOp are all abstractPipelines in essence. Unlike the Netty pipeline, each pipeline here and here is a node. So AbstractPipeline is a very important class. Click open the class as shown below, which has several important properties: SourceStage (sourceStage), previousStage (upstream Stage, previousStage), nextStage (downstream Stage, nextStage)
abstract class AbstractPipeline<E_IN.E_OUT.S extends BaseStream<E_OUT.S>>
extends PipelineHelper<E_OUT> implements BaseStream<E_OUT.S> {
// point to the Head node
@SuppressWarnings("rawtypes")
private final AbstractPipeline sourceStage;
/ / upper Stage
@SuppressWarnings("rawtypes")
private final AbstractPipeline previousStage;
protected final int sourceOrOpFlags;
/ / the downstream stages
@SuppressWarnings("rawtypes")
private AbstractPipeline nextStage;
// Define the abstract method, subclass implementation, return a Sink object
abstract Sink<E_IN> opWrapSink(int flags, Sink<E_OUT> sink); . }Copy the code
In the source code, each Stage is described by Stage, and it is a typical two-way linked list structure. Thus the Stream process can be described in the following figure
In order to prove the correctness of the above, we point to open source step by step analysis
list.stream().filter(integer -> integer>5).sorted().forEach(System.out::println);
Copy the code
1. Generate a Head object
The first step is the stream method, which ends up calling the following code, which essentially generates a Head object after calling the stream method
public final class StreamSupport {
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
Objects.requireNonNull(spliterator);
/ / make Head
return newReferencePipeline.Head<>(spliterator, StreamOpFlag.fromCharacteristics(spliterator), parallel); }}abstract class AbstractPipeline<E_IN.E_OUT.S extends BaseStream<E_OUT.S>>
extends PipelineHelper<E_OUT> implements BaseStream<E_OUT.S> { AbstractPipeline(Spliterator<? > source,int sourceFlags, boolean parallel) {
// upstream Stage, Head upstream is null
this.previousStage = null;
// Source splitter, which can be understood as an advanced version of an iterator
this.sourceSpliterator = source;
// Head pointer to self
this.sourceStage = this;
// Is the flag of an intermediate operation
this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
// The following is an optimization of:
// The operation flags of the source and the combined source of all operations, including the operations represented by this pipe object. Effective in evaluating pipeline preparation.
this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
// The intermediate operand between this pipe object and the stream source (if sequential), Head depth is 0
this.depth = 0;
// True if the pipes are parallel, otherwise the pipes are sequential; Valid only for the source phase.
this.parallel = parallel; }}Copy the code
2. Generate the operation object corresponding to filter
The second step is the filter method, which is called as follows, creating a stateless operation object, StatelessOp. The Head (this) object is passed in when the object is created, and the sourceStage (Head) is also recorded. At this point, the pointing relationship of the figure above is clear.
@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
Objects.requireNonNull(predicate);
// This is the Head object, which will eventually be recorded as a previousStage by the current StatelessOp object, and will also point the nextStage of the Head object to StatelessOp
return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
// The implementation of filtering
if (predicate.test(u))
downstream.accept(u);
}
};
}
};
}
AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
if (previousStage.linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
previousStage.linkedOrConsumed = true;
// Point the nextStage pointer to Head to the current object, that is, the StatelessOp object that encapsulates filter
previousStage.nextStage = this;
// Point the previousStage of the StatelessOp object to the Head
this.previousStage = previousStage;
this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
// Point the sourceStage of the StatelessOp object to Head
this.sourceStage = previousStage.sourceStage;
if (opIsStateful())
sourceStage.sourceAnyStateful = true;
this.depth = previousStage.depth + 1;
}
Copy the code
The Sink interface is the protocol interface used to connect the current flow with the next flow. Each concrete subclass of AbstractPipeline implements the opWrapSink method to return a Sink instance.
interface Sink<T> extends Consumer<T> {
// Start method, which can inform downstream to make relevant preparations
default void begin(long size) {}
// End the method to tell the downstream that all elements have been iterated over
default void end(a) {}
// Whether short-circuit operation is required
default boolean cancellationRequested(a) {
return false;
}
/ /...
// The default implementation class of the Sink interface
static abstract class ChainedReference<T.E_OUT> implements Sink<T> {
// Downstream here means a downstream processing sink
protected final Sink<? super E_OUT> downstream;
public ChainedReference(Sink<? super E_OUT> downstream) {
this.downstream = Objects.requireNonNull(downstream);
}
@Override
By default, the downstream begin method is called directly
public void begin(long size) {
downstream.begin(size);
}
@Override
// Calls the downstream end method directly by default
public void end(a) {
downstream.end();
}
@Override
// Returns whether short-circuited
public boolean cancellationRequested(a) {
returndownstream.cancellationRequested(); }}}Copy the code
3. Generate sorted objects
In the third step, sorted method, sorted returns an instance of OfRef class, similar to filter, and also implements opWrapSink method. However, because sorted method implements sorting function, it determines the comparator. Finally, the collation logic is implemented by the comparator.
private static final class OfRef<T> extends ReferencePipeline.StatefulOp<T.T> { OfRef(AbstractPipeline<? , T, ? > upstream) {// Similar to filter method, determine upstream and downstream stage relation
super(upstream, StreamShape.REFERENCE,
StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
this.isNaturalSort = true;
@SuppressWarnings("unchecked")
// Default comparator
Comparator<? super T> comp = (Comparator<? super T>) Comparator.naturalOrder();
this.comparator = comp;
}
/ /...
@Override
public Sink<T> opWrapSink(int flags, Sink<T> sink) {
Objects.requireNonNull(sink);
if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)
return sink;
else if (StreamOpFlag.SIZED.isKnown(flags))
return new SizedRefSortingSink<>(sink, comparator);
else
// Return RefSortingSink instance
return new RefSortingSink<>(sink, comparator);
}
/ /...
}
private static final class RefSortingSink<T> extends AbstractRefSortingSink<T> {
private ArrayList<T> list;
RefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
super(sink, comparator);
}
@Override
// Initializes a list of elements that will receive the upstream flow for sorting
public void begin(long size) {
if (size >= Nodes.MAX_ARRAY_SIZE)
throw new IllegalArgumentException(Nodes.BAD_SIZE);
list = (size >= 0)?new ArrayList<T>((int) size) : new ArrayList<T>();
}
@Override
// The actual sort operation
public void end(a) {
list.sort(comparator);
downstream.begin(list.size());
// Finalizing is not a short circuit operation, traversal
if(! cancellationWasRequested) { list.forEach(downstream::accept); }// Finalizing is short-circuiting
else {
for (T t : list) {
if (downstream.cancellationRequested()) break; downstream.accept(t); }}/ / end
downstream.end();
list = null;
}
@Override
// Add elements to the initialized list
public void accept(T t) { list.add(t); }}Copy the code
4. Turn the gears
So far, we have built our bidirectional linked list of operations, each of which is stored in a separate StatelessOp or StatefulOp object, but in the previous operation, our encapsulated Sink object was not actually called. This is why a Stream does not fire any intermediate operation before terminating. Everything is ready. East wind is the termination operation, and click on the forEach method in this example.
@Override
public void forEach(Consumer<? super P_OUT> action) {
evaluate(ForEachOps.makeRef(action, false));
}
public static <T> TerminalOp<T, Void> makeRef(Consumer<? super T> action,
boolean ordered) {
Objects.requireNonNull(action);
return new ForEachOp.OfRef<>(action, ordered);
}
OfRef(Consumer<? super T> consumer, boolean ordered) {
super(ordered);// ForEachOp is the parent class, and the argument says whether the traversal is in order
this.consumer = consumer;
}
Copy the code
This ForEachOps is the factory class where the user creates the TerminalOp instance. TerminalOp is the top-level interface for terminating operations. TerminalOp interface implementation classes include ForEachOp, ReduceOp, FindOp, MatchOp. The foreachops. makeRef method returns a ForEachOp object that encapsulates the forEach operation.
//ForEachOp implements TerminalSink, which is essentially Sink
static abstract class ForEachOp<T>
implements TerminalOp<T.Void>, TerminalSink<T.Void> {
private final boolean ordered;
protected ForEachOp(boolean ordered) {
this.ordered = ordered;
}
// TerminalOp
@Override
public int getOpFlags(a) {
return ordered ? 0 : StreamOpFlag.NOT_ORDERED;
}
@Override
public <S> Void evaluateSequential(PipelineHelper
helper, Spliterator
spliterator)
{
return helper.wrapAndCopyInto(this, spliterator).get();
}
/ /...
}
Copy the code
With the ForEachOp object in hand, we return to the Evaluate method, which flips the last ring of the gear. Here we use a serial flow, will eventually go to terminalOp. EvaluateSequential method
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape(a) == terminalOp.inputShape();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
// Serial stream, where this is the StatefulOp object returned by the sorted method
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
@Override
public <S> Void evaluateSequential(PipelineHelper
helper, Spliterator
spliterator)
{
// Call the wrapAndCopyInto method of the StatefulOp object. AbstractPipeline is a subclass of PipelineHelper
// this is forEachOp
return helper.wrapAndCopyInto(this, spliterator).get();
}
@Override
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
// Encapsulate the ForEachOp object as sink, ForEachOp
copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
return sink;
}
// Encapsulate the sink operation to find the next node of Head, where is the StatelessOp node generated by filter
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
// Wrap Stage as Sink
sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
}
return (Sink<P_IN>) sink;
}
Copy the code
Finally, let’s look at copyInto methods. ForEach is a non-short-circuit operation
@Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
Objects.requireNonNull(wrappedSink);
// Non-short-circuit operation
if(! StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {// Call Sink's begin method wrapped earlier for pre-notification
wrappedSink.begin(spliterator.getExactSizeIfKnown());
//spliterator iterates the data sourceSpliterator Iterates the data source.// Call the previously wrapped Sink end method for post-notification
wrappedSink.end();
}
// The short circuit operation will call the cancellationRequested method wrapped by Sink during traversal. If it returns, the subsequent operation will not be carried out
else{ copyIntoWithCancel(wrappedSink, spliterator); }}@SuppressWarnings("unchecked")
@Override
// The Sink interface inherits the Consumer interface
public void forEachRemaining(Consumer<? super T> action) {
Object[] a; int i, hi; // hoist accesses and checks from loop
if (action == null)
throw new NullPointerException();
if ((a = array).length >= (hi = fence) &&
//index starts at 0, hi is the set length
(i = index) >= 0 && i < (index = hi)) {
// Loop through the accept method
do { action.accept((T)a[i]); } while(++i < hi); }}Copy the code
At this point, the truth is clear.
Think of a Stream in object-oriented terms
By analogy with real life, we can compare Stream to water flow. The intermediate operation is equivalent to the reservoir in the process of water flow, Sink is the operator of each reservoir, and the terminal operation is the commander who gives instructions. The operations in this example can be as follows:
conclusion
Stream Stream provides iterating operations for developers. Similar iterating operations also exist in JS and other languages. Mastering Stream Stream will effectively improve development efficiency. In this paper, the use and principle of Stream Stream has been the simplest analysis, more advanced functions are not involved, such as parallel flow will be applied to multithreading. Since the author has not yet involved in the operation of parallel flow, it is not mentioned in the article, and students who are interested can study it by themselves.
A Stream Stream is a good example of object orientation, which makes code more visual and alive. Experienced object-oriented language developers create works of art. Take the Stream Stream for example, and the responsibility chain programming is classic, such as the filter chain in the Spring framework or the processing chain in Netty. I hope we can continue to polish our own developed code and keep improving.
This article may contain errors and deficiencies, please correct.