The Java 8 API has added a new abstraction called a Stream that lets you process data in a declarative way.

Stream provides a high-level abstraction of Java collection operations and expressions in an intuitive manner similar to querying data from a database with SQL statements.

The Stream API can greatly improve the productivity of Java programmers, allowing programmers to write efficient, clean, and concise code.

This article will analyze the implementation principle of Stream.

Composition and characteristics of Stream

A Stream is a queue of elements from a data source and supports aggregation operations:

  • Elements are objects of a specific type that form a queue.JavaIn theStreamElements are not stored and managed like collections, but are evaluated on demand
  • The source of a data source stream can be a collectionCollection, arrays,Array,I/O channelThe generatorgenerator 等
  • Aggregation operations are similarSQLStatement like operations, such asfilter.map.reduce.find.match.sortedEtc.

Unlike the previous Collection operation, the Stream operation has two basic characteristics:

  • Pipelining: All intermediate operations return the stream object itself. These operations can be cascaded into a pipe, as in fluent style. Doing so optimizes operations, such as delayed execution (laziness evaluation) and short circuit (short-circuiting)
  • Internal iteration: Used to pass the collection traversalIteratororFor-EachExplicitly iterates over the outside of the set. This is called external iteration.StreamProvides a way to iterate internally through the visitor pattern (Visitor) implementation.

Unlike iterators, streams can parallelize operations. Iterators can only serialize operations imperative. As the name implies, when traversing in serial mode, each item is read before the next. With parallel traversal, the data is divided into segments, each of which is processed in a different thread, and the results are output together.

Parallel operation of Stream relies on the Fork/Join framework (JSR166y) introduced in Java7 to split tasks and speed up processing. The evolution of Java’s parallel apis is basically as follows:

1.0 1.4 the Java. Lang. Thread

5.0 the Java. Util. Concurrent

Phasers in 6.0, etc

Fork/Join framework in 7.0

8.0 the Lambda

Stream has parallel processing capability. The process is divide-and-conquer, that is, a large task is divided into several smaller tasks, which means that each task is an operation:

List<Integer> numbers = Arrays.asList(1.2.3.4.5.6.7.8.9);
numbers.parallelStream()
       .forEach(out::println); 
Copy the code

As you can see, a single line of code implements the elements of a parallel output set, but the order of parallel execution is not controllable so the results are not necessarily the same each time.

If it must be the same, use the forEachOrdered method to terminate:

List<Integer> numbers = Arrays.asList(1.2.3.4.5.6.7.8.9);
numbers.parallelStream()
       .forEachOrdered(out::println);  
Copy the code

There is a question here, if the result needs to be ordered, is it against the purpose of parallel execution? Yes, this scenario obviously doesn’t need to use parallel streams and can be executed directly with serial streams, otherwise the performance might be worse because you end up forcing all the parallel results to be sorted again.

OK, let’s talk about the Stream interface.

The BaseStream interface

The parent interface of Stream is BaseStream, which is the top-level interface of all Stream implementations and is defined as follows:

public interface BaseStream<T, S extends BaseStream<T, S>>
        extends AutoCloseable {
    Iterator<T> iterator();

    Spliterator<T> spliterator();

    boolean isParallel();

    S sequential();

    S parallel();

    S unordered();

    S onClose(Runnable closeHandler);

    void close(a); }Copy the code

Where T is the type of the element in the stream, S is a BaseStream implementation class whose elements are also T and S is itself:

S extends BaseStream<T, S>

Are you a little dizzy?

This is easy to understand if you look at the use of S in interfaces: methods like sequential() and parallel(), which both return an instance of S, that is, they support sequential or parallel operations on the current stream, respectively, and return a “changed” stream object.

If it is parallel, it must involve splitting the current stream, that is, splitting a stream into multiple substreams, which must be of the same type as the parent stream. The subflow can continue to disassemble the molecular flow, and so on…

That is, S is an implementation of BaseStream, which is also a Stream, such as Stream, IntStream, LongStream, etc.

The Stream interface

Consider the Stream interface declaration:

public interface Stream<T> extends BaseStream<T, Stream<T>> 
Copy the code

Stream

can be split into Stream

.

Stream<T> filter(Predicate<? super T> predicate);
<R> Stream<R> map(Function<? super T, ? extends R> mapper); <R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper); Stream<T> sorted(); Stream<T> peek(Consumer<? super T> action); Stream<T> limit(long maxSize); Stream<T> skip(long n); .Copy the code

These are intermediate operations that operate on the flow, and their return result must be the flow object itself.

Close stream operation

BaseStream implements the AutoCloseable interface, where the close() method is called when the stream is closed. BaseStream also provides an onClose() method:

/** * Returns an equivalent stream with an additional close handler. Close * handlers are run when the {@link #close()} method * is called on the stream, and are executed in the order they were * added. All close handlers are run, even if earlier close handlers throw * exceptions. If any close handler throws an exception, the first * exception thrown will be relayed to the caller of {@code close()}, with * any remaining exceptions added to that exception as suppressed exceptions * (unless one of the remaining exceptions is the same exception as the * first exception, since an exception cannot suppress itself.) May * return itself. * * <p>This is an <a href="package-summary.html#StreamOps">intermediate * operation</a>. * * @param closeHandler A task to execute when the stream is closed * @return a stream with a handler that is run if the stream is closed */
S onClose(Runnable closeHandler);
Copy the code

The onClose() method of the stream object is triggered when the close() interface of AutoCloseable is called, but there are a few points to note:

  • onClose()The method returns the stream object itself, which means that the modified object can be called multiple times
  • If more than one is calledonClose()Method, which fires in the order it was called, but only throws the first exception up if a method has an exception
  • Before aonClose()Method throws an exception and does not affect subsequent eventsonClose()Use of methods
  • If multipleonClose()Methods all throw exceptions that show only the stack of the first exception, while other exceptions are compressed to show only partial information

Parallel streams and serial streams

The BaseStream interface provides two methods, parallel stream and serial stream, which can be called any number of times or mixed, but will only be returned with the result of the last method call.

Refer to the parallel() method for instructions:

Returns an equivalent stream that is parallel. May return

itself, either because the stream was already parallel, or because

the underlying stream state was modified to be parallel.

So calling the same method multiple times does not generate a new stream, but simply reuses the current stream object.

The following example computes sum in parallel using the last call to parallel() :

stream.parallel()
   .filter(...)
   .sequential()
   .map(...). .parallel() .sum();Copy the code

The Man behind ParallelStream: ForkJoinPool

The ForkJoin framework is a new feature from JDK7 that, like ThreadPoolExecutor, implements the Executor and ExecutorService interfaces. It uses an “infinite queue” to hold tasks that need to be executed, and the number of threads is passed in through the constructor. If the desired number of threads is not passed into the constructor, the number of cpus currently available to the computer is set to the number of threads as the default.

ForkJoinPool is used to solve problems using divide-and-conquer algorithms, such as the _ quicksort Algorithm. The point here is that ForkJoinPool uses relatively few threads to handle a large number of tasks. For example, to sort 10 million pieces of data, the task would be split into two 5 million sort tasks and a merge task for those two 5 million pieces of data.

Similarly, the same segmentation will be done for 5 million data, and a threshold value will be set at the end to stipulate when the data size reaches the threshold, such segmentation will be stopped. For example, when the number of elements is less than 10, splitting is stopped and insertion sort is used to sort them. So at the end of the day, all the missions add up to about 2 million + missions.

The point is, for a task, it can’t be executed until all of its subtasks have been completed. Imagine merging sort.

So using divide-and-conquer is problematic when using ThreadPoolExecutor because threads in ThreadPoolExecutor cannot add another task to the task queue and wait for it to complete before continuing to execute. When ForkJoinPool is used, a thread can create a new task and suspend the current one. The thread can then select a subtask from the queue.

What is the difference in performance when using ThreadPoolExecutor or ForkJoinPool?

First, ForkJoinPool can use a limited number of threads to complete a very large number of parent-child tasks, such as more than two million tasks with four threads. With ThreadPoolExecutor, this is impossible because threads in ThreadPoolExecutor do not have the ability to preferentially execute subtasks. If you need to complete 2 million parent-child tasks, you also need 2 million threads, which is obviously not feasible.

Principle of Work Stealing:

  1. Each worker thread has its own WorkQueue;
  2. This is a double-ended queue, dequeue, which is thread private;
  3. A subtask fork in a ForkJoinTask is placed in the queue head of the worker thread that runs the task. The worker thread processes the tasks in the queue in LIFO order, that is, in stack mode.
  4. To maximize CPU utilization, idle threads will “steal” tasks from other threads’ queues to execute
  5. But steal tasks from the end of the work queue to reduce contention with the queue’s thread;
  6. Double-endian queue operations: Push ()/pop() is called only in its owner worker thread, poll() is called when another thread steals the task;
  7. When there is only one task left, there is still competition, which is implemented through CAS;

Look at ParallelStream through the eyes of ForkJoinPool

Java 8 adds a common thread pool for ForkJoinPool that handles tasks that have not been explicitly submitted to any thread pool. It is a static element on type ForkJoinPool that has a default number of threads equal to the number of cpus on the running computer. Automatic parallelization happens when new methods added on the Arrays class are called. For example, parallel quicksort, which is used to sort an array, is used to walk through the elements of an array. Automatic parallelism is also used in the Stream API, which is new to Java 8.

For example, the following code iterates through a list of elements and performs the required action:

List<UserInfo> userInfoList =
        DaoContainers.getUserInfoDAO().queryAllByList(new UserInfoModel());
userInfoList.parallelStream().forEach(RedisUserApi::setUserIdUserInfo);
Copy the code

Operations on the elements in the list are performed in parallel. The forEach method creates a task forEach element’s evaluation, which is processed by the ForkJoinPool mentioned earlier. The above parallel logic could of course be done with ThreadPoolExecutor, but ForkJoinPool is superior in terms of readability and code volume.

For the number of threads in the ForkJoinPool common thread pool, the default is usually the number of processors on the runtime computer. Can also by setting the system property: – Djava. Util. Concurrent. ForkJoinPool.com mon. Parallelism = N (N is number of threads), to adjust the number of threads ForkJoinPool.

Note that the currently executing thread is also used to execute the task, so the final number of threads is N+1, which is the current main thread.

There is a problem here, if you perform calculations on parallel streams using _ blocking operations such as I/O, it may cause some problems:

public static String query(String question) {
  List<String> engines = new ArrayList<String>();
  engines.add("http://www.google.com/?q=");
  engines.add("http://duckduckgo.com/?q=");
  engines.add("http://www.bing.com/search?q=");
   
  // get element as soon as it is available
  Optional<String> result = engines.stream().parallel().map((base) - {
    String url = base + question;
    // open connection and fetch the result
    return WS.url(url).get();
  }).findAny();
  return result.get();
}
Copy the code

This is a typical example. Let’s analyze it:

  • This parallel stream calculation operation will be default by the main thread and JVMForkJoinPool.commonPool()To execute together.
  • mapIs a blocking method that requires accessHTTPInterface and get itsresponse, so any worker thread will block and wait for the result when executing here.
  • So when a calculation method is called elsewhere in parallel flow mode, it will be affected by the method that blocks and waits here.
  • The currentForkJoinPoolThe implementation does not consider compensating worker threads waiting for those blocking while waiting for the newly generated thread, so eventuallyForkJoinPool.commonPool()The thread in will spare light and block waiting.

As we saw in the case analysis of the above column, lambda execution is not instantaneous, and all programs using Parallel Streams are likely to be the source of a blocking program, making it impossible for other parts of the program to access workers during execution. This means that any program that relies on Parallel Streams can become unpredictable and potentially dangerous when something else is occupying the Common ForkJoinPool.

Summary:

  1. When dealing with recursive divide-and-conquer algorithms, consider ForkJoinPool.
  2. Carefully set the threshold for no longer dividing tasks. This threshold affects performance.
  3. Several features in Java 8 use a common thread pool in ForkJoinPool. In some cases, you need to adjust the default number of threads for that thread pool
  4. Lambda should avoid side effects as much as possible, that is, avoid mutations based on the state of the heap as well as any IO
  5. Lambdas should not interfere with each other, which means that data sources should not be modified (because this can cause thread-safety problems)
  6. Avoid access to states that might change during the lifecycle of a flow operation

Parallel flow performance

The performance of the parallel flow framework is affected by the following factors:

  • Data size: parallelism is meaningful only if the data is large enough and the processing time of each pipe is long enough;
  • Source data structure: Each pipeline operation is based on the initial data source, usually a collection, and there is some cost in splitting different collection data sources;
  • Boxing: Basic types are processed faster than boxing types;
  • Number of cores: By default, the more cores, the more threads are started in the underlying fork/join thread pool;
  • Unit processing overhead: The longer the time spent on each element in the stream, the greater the performance gain from parallel operations;

The source data structure is divided into the following three groups:

  • Good performance:ArrayList, array, orIntStream.range(Data supports random reading and can be easily divided arbitrarily)
  • General performance:HashSet,TreeSet(Data is not easily decomposed fairly, and most can be.)
  • Poor performance:LinkedList(Need to traverse the linked list, difficult to decompose in half),Stream.iterateandBufferedReader.lines(Length unknown, difficult to decompose)

Note: The following are excerpts from Streams of Streams’ behind-the-scenes principles, and thanks to Brian Goetz_ for being so transparent.

NQ model

The last two factors to consider in determining whether parallelism leads to an increase in speed are the amount of data available and the amount of computation performed for each data element.

In our original description of parallel decomposition, we used the concept of splitting sources until segments were small enough that a sequential approach to solving problems on that segment would be more efficient. The size of the segments must depend on the problem being solved, specifically, on the amount of work done by each element. For example, calculating the length of a string involves much less work than calculating the SHA-1 hash of the string. The more work you do for each element, the lower the threshold of “big enough to take advantage of parallelism.” Similarly, the more data you have, the more segments you split without running afoul of the “too small” threshold.

A simple but useful parallel performance model is the NQ model, where N is the number of data elements and Q is the amount of work performed for each element. The larger the product N*Q, the more likely it is to achieve parallel acceleration. For problems with a very small Q, such as summing numbers, you might typically want to see N > 10,000 to gain speed; As Q increases, the size of the data required to gain speed will decrease.

Many of the barriers to parallelization (such as split costs, composition costs, or encountering order sensitivity) can be mitigated by operations with higher Q. While the result of splitting a LinkedList feature might be bad, it’s still possible to get a parallel boost with a large enough Q.

In the order

Encounter order refers to whether the order in which the source distributes elements is critical to the calculation. Some sources (such as hash-based collections and maps) have no meaningful order of encounter. The flow flag ORDERED describes whether the flow has a meaningful order in which it is encountered. The SPLiterator of the JDK collection sets this flag according to the collection’s specification; Some intermediate operations may inject ORDERED (sorted()) or clear it (unordered()).

If a stream does not encounter an order, most stream operations must follow that order. For sequential execution, the encounter order is “automatically preserved” because elements are processed naturally in the order in which they were encountered. Even in parallel execution, many operations (stateless intermediate operations and some termination operations (such as reduce())) do not incur any real cost to follow the order of encounter. But for other operations (stateful intermediate operations, terminating operations whose semantics are associated with the order of encounter, such as findFirst() or forEachOrdered()), the responsibility for obeying the order of encounter in parallel execution can be significant. If the flow has a defined order of encounter that makes no sense to the result, then the ORDERED execution of pipes containing sequence-sensitive operations can be accelerated by removing ORDERED flags using the unordered() action.

As an example of encountering sequence-sensitive operations, consider limit(), which truncates a stream at a specified size. Implementing limit() in sequential execution is simple: keep a counter of how many elements have been seen, and discard any elements after that. But in parallel execution, implementing limit() is much more complicated; You need to keep the first N elements. This requirement greatly limits the ability to exploit parallelism; If the input is divided into parts, you don’t know whether the results of a part will be included in the final result until all parts preceding that part have been completed. As a result, the implementation typically mistakenly chooses not to use all available cores, or to cache the entire experimental results until you reach the target length.

If the stream encounters no order, the limit() operation is free to choose any N elements, which makes execution much more efficient. Elements can be sent downstream as soon as they are known, without any caching, and the only coordination between threads is to send a signal to ensure that the target stream length is not exceeded.

Another, less common example of encountering order costs is sorting. If the encountered order makes sense, the sorted() operation implements a stable sort (the same elements appear in the output in the same order as they entered the input), whereas stability (at a cost) is not required for unordered streams. Distinct () has a similar situation: If the stream has an encounter order, Distinct () must emit the first of multiple identical input elements, while for unordered streams, it can emit any element — again achieving a much more efficient parallel implementation.

You encounter a similar situation when you use the collect() aggregation. If the collect(groupingBy()) operation is performed on an unordered stream, the elements corresponding to any key must be provided to the downstream collector in the order in which they appear in the input. This order usually doesn’t make sense to the application, and no order makes sense. In these cases, it may be best to choose a concurrent collector (such as groupingByConcurrent()) that ignores the order of encounters and allows all threads to collect directly into a shared concurrent data structure (such as ConcurrentHashMap), Rather than having each thread collect its own intermediate mappings and then merge the intermediate mappings (which can be costly).

From: elsef.com/2019/09/16/Java8 Stream in the analysis of the principle