The class diagram

Concept to explain

Pipline and Stage

A Pipline is an assembly line, representing an entire process. Stage represents one of the stages of the pipeline. “Stage” is an abstract description, because stage mainly represents a logical sequential relationship, and Sink is used to describe what to do and how to do in each stage.

new stream          //stage 0
    .filter()       //stage 1
    .sort()         //stage 2
Copy the code

Sink

Literally, the sink, the sink in life is nothing more than

  • Turn on the tap and know water is coming
  • The water is in the sink and can be manipulated
  • Open the sluice and let out the water

The core functions of Sink in Java are:

  • Begin (): Tells the sink that water is coming and that some initialization can be done
  • Accept () : Accepts the flow of water, and then operates
  • End () : The water flow is completely treated.

Looking at an example of sort(), the purpose of the sort stage is to sort all the streams before they flow downstream.

private static final class SizedRefSortingSink<T> extends AbstractRefSortingSink<T> {
        private T[] array;  // To sort, you need an array to cache
        private int offset; 

        SizedRefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
            super(sink, comparator);
        }

        @Override
        @SuppressWarnings("unchecked")
        public void begin(long size) {
            if (size >= Nodes.MAX_ARRAY_SIZE)
                throw new IllegalArgumentException(Nodes.BAD_SIZE);
            Begin () is called upstream to tell sort to initialize and produce an array
            array = (T[]) new Object[(int) size];
        }

    	// Upstream calls end() to tell sort that all the water has passed through. Sort starts to perform the operation
        @Override
        public void end(a) {
            / / operation
            Arrays.sort(array, 0, offset, comparator);
            // Tell downstream of SORT to prepare to receive water
            downstream.begin(offset);
            // Each element is passed downstream
            if(! cancellationWasRequested) {for (int i = 0; i < offset; i++)
                    downstream.accept(array[i]);
            }
            else {
                for (int i = 0; i < offset && ! downstream.cancellationRequested(); i++) downstream.accept(array[i]); }// Tell downstream that the flow is over
            downstream.end();
            // The cache is cleared
            array = null;
        }

    	// Upstream calls accept() to store the stream into sort's cache array
        @Override
        public void accept(T t) { array[offset++] = t; }}Copy the code

Create a Head

doubt

  • Stream does not store data, so where does it store data?

The solution comes later.

use

You can create a Stream using stream.of (), for example

// Create of()
Stream<Integer> stream = Stream.of(1.2.3);
Copy the code

Source code analysis

The of() method call

StreamSupport.stream(Arrays.spliterator(arr, 0, arr.length), false);
Copy the code

Stream () method logic:

public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
    Objects.requireNonNull(spliterator);
    return new ReferencePipeline.Head<>(spliterator,
                                        StreamOpFlag.fromCharacteristics(spliterator),
                                        parallel);
}
Copy the code

ReferencePipeline.Head<> is called, returning a Head object. Head is a subclass of ReferencePipeline. Head is the first stage of the assembly line. The main logic of the constructor goes all the way from super() to the AbstractPipeline class

    /**
     * The source spliterator. Only valid for the head pipeline.
     * Before the pipeline is consumed if non-null then {@code sourceSupplier}
     * must be null. After the pipeline is consumed if non-null then is set to
     * null.
     */
    privateSpliterator<? > sourceSpliterator;/**
     * Constructor for the head of a stream pipeline.
     *
     * @param source {@code Spliterator} describing the stream source
     * @param sourceFlags the source flags for the stream source, described in
     * {@link StreamOpFlag}
     * @param parallel {@code true} if the pipeline is parallel
     */AbstractPipeline(Spliterator<? > source,int sourceFlags, boolean parallel) {
        this.previousStage = null;
        // Use a field to point to the data set as a Spliterator
        this.sourceSpliterator = source;
        this.sourceStage = this;
        this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
        // The following is an optimization of:
        // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
        this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
        this.depth = 0;
        this.parallel = parallel;
    }
Copy the code

Question answer

  1. Stream does not store data, so where does it store data?
    • The Spliterator object in the Head that holds the data source and then manipulates the data as a Spliterator

In the middle of operation

A few questions

  1. How are the intermediate operations related?
  2. How do I finish an intermediate operation and then do the next one?
  3. How do stateful intermediate operations preserve state?
  4. How is lazy loading implemented

use

Stream<Integer> st = headStream.filter(e-> e=1).distinct().sort();
/ / is equivalent toStream<Integer> afterFilter = headStream.filter(e -> e ! =2);
Stream<Integer> afterDistinct = afterFilter.distinct();
Stream<Integer> afterSort = afterDistinct.sort();
Copy the code

Filter

What happens when you execute filter(op)?

Stream<Integer> afterFilter = head.filter(e -> e = 1);
Copy the code

The filter() method is defined in the Stream class and implemented in the ReferencePipeline class.

//ReferencePipeline.class

@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
    Objects.requireNonNull(predicate);

    Return a StatelessOp class
    // The constructor takes (this,)
    return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                         StreamOpFlag.NOT_SIZED) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
            return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }

                @Override
                public void accept(P_OUT u) {
                    if(predicate.test(u)) downstream.accept(u); }}; }}; }Copy the code

Return a StatelessOp class (because filter is a stateless operation). Look at the StatelessOp class, which is a static abstract inner class that inherits from the ReferencePipeline class.

//ReferencePipeline.class /** * Base class for a stateless intermediate stage of a Stream. * * @param <E_IN> type of Elements in the upstream source * @param <E_OUT> type of elements in produced by this stage * @since 1.8 */ abstract static class StatelessOp<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> { /** * Construct a new Stream by appending a stateless intermediate * operation to an existing stream. * * @param upstream The upstream pipeline stage * @param inputShape The stream shape for the upstream pipeline stage * @param opFlags Operation flags for the new stage */ StatelessOp(AbstractPipeline<? , E_IN, ? > upstream, StreamShape inputShape, int opFlags) { super(upstream, opFlags); assert upstream.getOutputShape() == inputShape; } @Override final boolean opIsStateful() { return false; }}Copy the code

In the middle, super() executes the Constructor of the AbstractPipeline class, connecting the relationships between stages

//AbstractPipeline.class /** * Constructor for appending an intermediate operation stage onto an * existing pipeline. * * @param previousStage the upstream pipeline stage * @param opFlags the operation flags for the new stage, described in * {@link StreamOpFlag} */ AbstractPipeline(AbstractPipeline<? , E_IN, ? > previousStage, int opFlags) { if (previousStage.linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); previousStage.linkedOrConsumed = true; previousStage.nextStage = this; this.previousStage = previousStage; this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK; this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags); this.sourceStage = previousStage.sourceStage; if (opIsStateful()) sourceStage.sourceAnyStateful = true; this.depth = previousStage.depth + 1; }Copy the code

Distinct

The sample

Stream<Integer> afterDistinct = afterFilter.distinct();
Copy the code

The distinct methods are implemented under the ReferencePipeline class

@Override
public final Stream<P_OUT> distinct(a) {
    return DistinctOps.makeRef(this);
}
Copy the code

Call the makeRef() method of the DistinctOps class, return a StatefulOp class, and rewrite the four methods to implement the logic in opWrapSink() :

    /**
     * Appends a "distinct" operation to the provided stream, and returns the
     * new stream.
     *
     * @param <T> the type of both input and output elements
     * @param upstream a reference stream with element type T
     * @return the new stream
     */
    static <T> ReferencePipeline<T, T> makeRef(AbstractPipeline
        upstream) {
        // Return a StatefulOp class
        return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE,
                                                      StreamOpFlag.IS_DISTINCT | StreamOpFlag.NOT_SIZED) {

            // Rewrite the following methods, omit...
            <P_IN> Node<T> reduce(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {... }@Override
            <P_IN> Node<T> opEvaluateParallel(PipelineHelper
       
         helper, Spliterator
        
          spliterator, IntFunction
         
           generator)
         []>
        
        {... }@Override
            <P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {... }@Override
            Sink<T> opWrapSink(int flags, Sink<T> sink) {
                Objects.requireNonNull(sink);

                if (StreamOpFlag.DISTINCT.isKnown(flags)) {
                    ...
                } else if (StreamOpFlag.SORTED.isKnown(flags)) {
                    ...
                } else {
                    // Return a SinkChainedReference class
                    return new Sink.ChainedReference<T, T>(sink) {
                        // Use a Set to cache data for de-duplication
                        Set<T> seen;

                        Set is initialized when upstream notifies BEGIN
                        @Override
                        public void begin(long size) {
                            seen = new HashSet<>();
                            downstream.begin(-1);
                        }

                        / / a little
                        @Override
                        public void end(a) {
                            seen = null;
                            downstream.end();
                        }

                        // If it already exists, discard it
                        @Override
                        public void accept(T t) {
                            if(! seen.contains(t)) { seen.add(t); downstream.accept(t); }}}; }}}; }Copy the code

The StatefulOp class is similar to the StatelessOp class in that it inherits the ReferencePipeline class, and the intermediate super() page executes the AbstractPipeline class constructor to connect the relationships between stages

    /**
     * Base class for a stateful intermediate stage of a Stream.
     *
     * @param <E_IN> type of elements in the upstream source
     * @param <E_OUT> type of elements in produced by this stage
     * @since1.8 * /
    abstract static class StatefulOp<E_IN.E_OUT> extends ReferencePipeline<E_IN.E_OUT> {
 		/ / to omit
    }
Copy the code

As for other intermediate operations, the routines are similar. The operation logic is encapsulated in the opWrapSink() method, which can be viewed slowly.

Question answer

  1. How are the intermediate operations related?
    • Operation after operation is encapsulated into operation after operationstatelessOporstateFulOpObject in a bidirectional linked list.
  2. How do I finish an intermediate operation and then do the next one?
    • Sink class is responsible for undertaking upstream and downstream tasks and executing operations of assembly line operation. The core methods are Begain (), Accept () and end().
  3. How do stateful intermediate operations preserve state?
    • Stateful intermediate operations are encapsulated asstateFulOpObjects, each with its own logical, concrete referencesort()Implementation logic.
  4. How is lazy loading implemented
    • After each intermediate operation is invoked, append simply stores the association at the end of the process.
    • The pipelining operation is started by the wrapAndCopyInto() method calling the Head Sink() operation, and both wrapAndCopyInto() methods need to be triggered by the finalization operation.

Put an end to the operation

A few questions

  1. How do finalizing methods operate?
  2. How do terminations trigger the flow?
  3. How to ensure that only one finalizing method can be executed at a time for a stream?

use

Here are four types of finalization operations, also in the API provided by Stream:

// Count the number of elements in the stream, FindOP
long count = afterLimit.count();

// Iterate over all elements,ForEachOp
afterLimit.forEach(System.out::printl);

// Get the first element,MatchOp
Optional<Integer> any = afterLimit.findFirst();

// Determine whether to ReduceOp
boolean flag = afterLimit.anyMatch(i -> i == 1);
Copy the code

count()

Implemented in the ReferencePipeline class

@Override
public final long count(a) {
 	// Call mapToLong to make all elements 1, and then calculate sum
    return mapToLong(e -> 1L).sum();
}
Copy the code

maoToLong()

The mapToLong() method is an intermediate operation

@Override
    public final LongStream mapToLong(ToLongFunction<? super P_OUT> mapper) {
        Objects.requireNonNull(mapper);
        return new LongPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) {
                return new Sink.ChainedReference<P_OUT, Long>(sink) {
                    @Override
                    public void accept(P_OUT u) {
                        //downstream.accept(mapper.applyAsLong(u)); }}; }}; }Copy the code

ToLongFunction is a functional interface class. Accept () is e -> 1L.

@FunctionalInterface
public interface ToLongFunction<T> {

    /**
     * Applies this function to the given argument.
     *
     * @param value the function argument
     * @return the function result
     */
    long applyAsLong(T value);
}
Copy the code

Looking at the Sum() method, in the LongPipeline class, the passing argument is a Long:: Sum, which adds two values.

@Override
public final long sum(a) {
    // use better algorithm to compensate for intermediate overflow?
    return reduce(0, Long::sum);
}

//public static long sum(long a, long b) {
// return a + b;
/ /}
Copy the code

reduce()

The reduce method encapsulates the operation function op into a Sink, and the function of makeLong() is to produce a Sink

@Override
public final long reduce(long identity, LongBinaryOperator op) {
    return evaluate(ReduceOps.makeLong(identity, op));
}
Copy the code
    /**
     * Constructs a {@code TerminalOp} that implements a functional reduce on
     * {@code long} values.
     *
     * @param identity the identity for the combining function
     * @param operator the combining function
     * @return a {@code TerminalOp} implementing the reduction
     */
    public static TerminalOp<Long, Long>
    makeLong(long identity, LongBinaryOperator operator) {
        Objects.requireNonNull(operator);
        class ReducingSink
                implements AccumulatingSink<Long.Long.ReducingSink>, Sink.OfLong {
                
            //state is a value used as a record
            private long state;

            @Override
            public void begin(long size) {
                state = identity;
            }

			// The argument is sun(), so accept() is used to add state
            @Override
            public void accept(long t) {
                state = operator.applyAsLong(state, t);
            }

            @Override
            public Long get(a) {
                return state;
            }

            @Override
            public void combine(ReducingSink other) { accept(other.state); }}return new ReduceOp<Long, Long, ReducingSink>(StreamShape.LONG_VALUE) {
            @Override
            public ReducingSink makeSink(a) {
                return newReducingSink(); }}; }Copy the code

evaluate()

Look back at the evaluate() method, which is used to perform the terminating operation

final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
    assert getOutputShape(a) == terminalOp.inputShape();
    // Determine whether the stream is already in use
    if (linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    // Set the usage flag to true
    linkedOrConsumed = true;

    // Perform the corresponding inference operation based on the flow type
    return isParallel()
        ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
        : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
Copy the code

Focusing on the inference method of sequence flow, we can see that the implementation of this method can be divided into four types, corresponding to the four types of operations mentioned above. Count belongs to ReduceOp, go in and have a look.


@Override
public <P_IN> R evaluateSequential(PipelineHelper
       
         helper, Spliterator
        
          spliterator)
        
        {
    // Call wrapAndCopyInto()
    return helper.wrapAndCopyInto(makeSink(), spliterator).get();
}
Copy the code

wrapAndCopyInto()

Ensure all stage -> sink lists, and then execute the copyInto() method

	@Override
    final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
        copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
        return sink;
    }
Copy the code

warpSink()

It is here, from back to front, that all the stages are packaged to form a sink list. In this way, the linked list structure of each stage before is packaged into each Sink.

	@Override
    @SuppressWarnings("unchecked")
    final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
        Objects.requireNonNull(sink);
		
        // traverse from back to front
        for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
            // Execute each opWrapSink() method, which is overridden in each action class
            sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
        }
        // return header sink
        return (Sink<P_IN>) sink;
    }
Copy the code

copyInto()

This method is the starting switch of the whole assembly line, and the process is as follows:

  1. Call the first sink’s begin() method
  2. Perform data source spliterator. ForEachRemaining traversal (wrappendSink) method calls the accept () method
  3. End () Indicates the end of the notification
	@Override
    final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
        Objects.requireNonNull(wrappedSink);

        if(! StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {// Notify the first sink to be ready to receive the flow
            wrappedSink.begin(spliterator.getExactSizeIfKnown());
            / / execution
            spliterator.forEachRemaining(wrappedSink);
            wrappedSink.end();
        }
        else{ copyIntoWithCancel(wrappedSink, spliterator); }}Copy the code

forEachRemaining()

The method forEachRemaining() is implemented in each container, and in the ArrayList:

public boolean tryAdvance(Consumer<? super E> action) {
    if (action == null)
        throw new NullPointerException();
    int hi = getFence(), i = index;
    if (i < hi) {
        index = i + 1;
        @SuppressWarnings("unchecked") E e = (E)list.elementData[i];
        // Execute the accept() method
        action.accept(e);
        if(list.modCount ! = expectedModCount)throw new ConcurrentModificationException();
        return true;
    }
    return false;
}
Copy the code

Other terminating operations

forEach()

In ReferencePipeline class, the forEach() method is implemented,

// from ReferencePipeline.class

@Override
public void forEach(Consumer<? super P_OUT> action) {
    // ForEachOps..
    evaluate(ForEachOps.makeRef(action, false));
}
Copy the code

Evaluate is the same as count.

findFirst() anyMatch()

The logic of findFirst() and anyMatch() is no more verbose

@Override
public final Optional<P_OUT> findFirst(a) {
    return evaluate(FindOps.makeRef(true));
}

@Override
public final boolean anyMatch(Predicate<? super P_OUT> predicate) {
    return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ANY));
}
Copy the code

Question answer

  1. How do finalizing methods operate?
    • The evaluate() method is called in the implementation of the terminating operation, which warps everything into a sink and then executes the begin(),accept(), and end() methods from scratch
  2. How do terminations trigger the flow?
    • The trigger switch is wrapAndCopyInto(), which is only called during the finalization operation.
  3. How to ensure that only one finalizing method can be executed at a time for a stream?
    • After executing the evaluate() method oncelinkedOrConsumedSet it to true, and the evaluate() method will fail later.

Reference:

  • Java 8 Stream Explorer (Colobu.com)
  • Java Stream implementation (https_toutiao.io)