What is a Stream?

Nowadays, many large data systems are divided into tables and libraries.

For example, the order table in the e-commerce system often uses the Hash value of user ID to realize table and library division, in order to reduce the amount of data in a single table and optimize the speed of order query by users.

However, when the background administrator reviews the order, they need to query the data of each data source to the application layer and then merge the operation.

For example, when we need to query all orders under filtering conditions and sort them according to a certain condition of the order, the data queried by a single data source can be sorted according to a certain condition, but the sorted data queried by multiple data sources does not mean that the sorted data is correctly sorted after merging. So we need to reorder the merged data set at the application layer.

Java Performance tuning Study Notes

Prior to Java8, we used to resort and merge data through a for loop or Iterator iteration, or by redefining the Collections.Sorts Comparator methods, neither of which was very efficient for large data systems.

Java8 has added a new interface class called Stream, which is a bit different from the byte Stream concept we’ve seen before. Streams in the Java8 collection are equivalent to an advanced version of Iterator. He can use Lambda expressions to perform all sorts of very convenient and efficient Aggregate operations, or Bulk Data operations, on collections.

The aggregation of Stream is similar to that of database SQL: sorted, Filter, map, and so on. In the application layer, we can efficiently realize the aggregation operation similar to database SQL. In terms of data operation, Stream can not only realize the data operation in serial way, but also process large quantities of data in parallel way, improving the efficiency of data processing.

The simplicity and power of Stream

Let’s use a simple example to see how simple and powerful Stream can be.

The requirement of this Demo is to filter and group male and female students above 160cm in a middle school. We first use the traditional iterative method to achieve this, and the code is as follows:

Map> stuMap = new HashMap>();
for (Student stu: studentsList) {
	if (stu.getHeight() > 160) {
		If (stumap.get (stu.getsex ()) == null) {List List = new ArrayList(); Add (stu); // Create a list of students of that gender. Stumap.put (stu.getsex (), list); stumap.put (stu.getsex (), list); } else {// The gender category already exists stumap.get (stu.getsex ()).add(stu); // The gender category already exists}}}
Copy the code

We’ll implement it using the Stream API in Java8:

1. Serial implementation

Map> stuMap = stuList.stream().filter((Student s) -> s.getHeight() > 160) .collect(Collectors.groupingBy(Student ::getSex));
Copy the code

2. Parallel implementation

Map> stuMap = stuList.parallelStream().filter((Student s) -> s.getHeight() > 160) .collect(Collectors.groupingBy(Student ::getSex));
Copy the code

From the two simple examples above, we can see that Stream with Lambda expression traversal filter function is very concise and convenient.

How does Stream optimize traversal?

Now that we’ve looked at the Stream API in Java8, how does a Stream optimize for iteration? How is parallelism implemented? Below we will analyze the implementation principle of Stream through Stream source code.

1.Stream operation classification

Before understanding the implementation principle of Stream, let’s first understand the operation classification of Stream, because its operation classification is actually one of the important reasons to realize efficient iteration of big data set. Why do you say that? You’ll know when you analyze it.

Operations in Stream are officially classified into two categories: Intermediate operations and Terminal operations. The intermediate operation only records the operation, that is, only returns a stream, and does not perform the calculation, while the final operation implements the calculation.

The intermediate operations can be divided into Stateless and Stateful operations, which means that the processing of an element was not affected by the previous elements, and the latter means that the operation could be continued only after all elements were acquired.

Termination operations can be divided into short-circuiting (SHORT-circuiting) and unshort-circuiting (unshort-circuiting) operations. The former means that the final result can be obtained after certain elements meet the conditions, while the latter means that the final result can be obtained only after all elements are processed. The operation classification details are as follows:

Intermediate operations are also commonly referred to as lazy operations, and it is this lazy operation combined with a processing Pipeline of finalizing operations and data sources that makes a Stream efficient.

2.Stream source code implementation

Before we look at how Stream works, let’s take a look at what major structure classes the Stream package is composed of and what each class does. Refer to the following figure:

BaseStream and Stream are the top interface classes. BaseStream defines the basic interface methods for streams, such as Spliterator, isParallel, etc. Stream defines some common operation methods of a Stream, such as map and filter.

ReferencePipeline is a structural class that assembles the flow of operations by defining inner classes. He defines three inner classes, Head, StatelessOp and StatefulOp, and implements the interface methods of BaseStream and Stream.

Sink interface is the protocol that defines the relationship between each Stream operation, including begin(), end(), cancellationRequested() and ACCpt () methods. ReferencePipeline will eventually assemble the whole Stream operation into a call chain, and the up-down relationship of each Stream operation on this call chain is defined and realized through the Sink interface protocol.

3. Stack the Stream operation

As we know, the operations of a Stream are assembled by processing pipes and the data processing is done uniformly. Each interrupt operation in the JDK is named after the use Stage.

The pipe structure is usually implemented by the ReferencePipeline class. When I explained the Stream package structure earlier, I mentioned that ReferencePipeline contains three internal classes: Head, StatelessOp, and StatefulOp.

The Head class is used to define data source operations. When we first call names.stream(), we load the Head object for the first time, which is the data source operation. AbstractPipeline AbstractPipeline AbstractPipeline AbstractPipeline AbstractPipeline AbstractPipeline AbstractPipeline AbstractPipeline AbstractPipeline When we call the termination operation, a final Stage will be generated, through which the previous intermediate operation will be triggered. Starting from the last Stage, a Sink chain will be generated recursively. As shown below:

Let’s take another example to feel how the operation classification of Stream achieves efficient iteration of large data sets.

List names = Arrays.asList("Zhang"."Bill"."Wang Lao Wu"."李三"."Liu Lao Si"."Wang Xiaoer"."Zhang four"."Zhang Five six seven.");
String maxLenStartWithZ = names.stream() .filter(name -> name.startsWith("Zhang")) .mapToint(String::length) .max() .toString();
Copy the code

The requirement for this example is to find the longest name with zhang as the last name. From a code point of view, you might think of something like this: First, iterate through the collection to get all the names starting with “zhang”; Then the filter collection is iterated once, converting the name to a numeric length; Finally, find the longest name from the length set and return it.

I want to make it very clear to you that this is not the case. Let’s step through how all the operations in this method are performed.

First, because names is an ArrayList Collection, names.stream() calls the stream method of Collection, the underlying interface of the Collection class:

default Stream stream() {return StreamSupport.stream(spliterator(), false); }Copy the code

The Stream method then calls StreamSupport’s Stream method, which initializes a ReferencePipeline Head inner class object:

public static Stream stream(Spliterator spliterator, Boolean parallel) {
	Objects.requireNonNull(spliterator);
	return new ReferencePipeline.Head<>(spliterator,StreamOpFlag.fromCharacteristics(spliterator),parallel);
}
Copy the code

The filter and map methods are both stateless intermediate operations. Therefore, when the filter and map operations are performed, a Stage is created to identify each operation of the user.

Normally, a Stream operation requires a callback function, so a complete Stage is represented by a triplet of data source, operation, and callback function. As shown in the figure below, the filter method and map method of ReferencePipeline are respectively:

@Overridepublic final Stream filter(Predicate predicate) {
	Objects.requireNonNull(predicate);
	return new StatelessOp(this, StreamShape.REFERENCE,StreamOpFlag.NOT_SIZED) {
		@OverrideSink opWrapSink(int flags, Sink sink) {
			return new Sink.ChainedReference(sink) {
				@Overridepublic void begin(long size) {
					downstream.begin(-1);
				}
				@Override public void accept(P_OUT u) {
					if(predicate.test(u)) downstream.accept(u); }}; }}; } @Override@SuppressWarnings("unchecked")public final Stream map(Function mapper) {
	Objects.requireNonNull(mapper);
	return new StatelessOp(this, StreamShape.REFERENCE,StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
		@OverrideSink opWrapSink(int flags, Sink sink) {
			return new Sink.ChainedReference(sink) {
				@Overridepublic void accept(P_OUT u){ downstream.accept(mapper.apply(u)); }}; }}; }Copy the code

New StatelessOp calls the constructor of the parent class AbstractPipeline, which connects the preceding and subsequent stages to generate a linked list of stages:

AbstractPipeline(AbstractPipeline previousStage, int opFlags) {if (previousStage.linkedOrConsumed)throw newIllegalStateException(MSG_STREAM_LINKED); previousStage.linkedOrConsumed =true; previousStage.nextStage =this;// Next pointer to the current stage points to the previous stagethis.previousStage = previousStage; // Assign the current stage when the global variable previousStage this.sourceOrOpFlags = opFlags & streamopflag.op_mask; this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags); this.sourceStage = previousStage.sourceStage; if (opIsStateful()) sourceStage.sourceAnyStateful = true; this.depth = previousStage.depth + 1; }
Copy the code

Because when creating each Stage, there will be an opWrapSink() method, which will encapsulate the specific implementation of an operation in the Sink class. Sink adopts the mode of (process -> forward) to overlay the operation.

When the Max method is executed, the Max method of ReferencePipeline will be called. At this time, since the Max method is the termination operation, a TerminalOp operation and a ReducingSink will be created, and the operation will be encapsulated in the Sink class.

@Overridepublic final Optional max(Comparator comparator) {
	return reduce(BinaryOperator.maxBy(comparator));
}
Copy the code

Finally, AbstractPipeline’s wrapSink method is called, which calls opWrapSink to generate a Sink linked list, each of which encapsulates the concrete implementation of an operation.

@Override@SuppressWarnings("unchecked")final Sink wrapSink(Sink sink) {
	Objects.requireNonNull(sink);
	for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
		sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
	}
	return (Sink) sink;
}
Copy the code

When the generation of the Sink list is completed, the Stream starts to execute, iterating through the spliterator collection to perform the specific operations in the Sink list.

@Overridefinal void copyInto(Sink wrappedSink, Spliterator spliterator) {
	Objects.requireNonNull(wrappedSink);
	if(! StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) { wrappedSink.begin(spliterator.getExactSizeIfKnown()); spliterator.forEachRemaining(wrappedSink); wrappedSink.end(); }else{ copyIntoWithCancel(wrappedSink, spliterator); }}Copy the code

In Java8, Spliterator’s forEachRemaining iterates through the collection. For each iteration, a Filter operation is performed. If the filter operation passes, the map operation is triggered, and the result is added to the temporary array Object for the next iteration. When the intermediate operation is complete, the finalizing operation Max is triggered.

So that’s serial processing, but what about the other way Stream handles data?

4.Stream parallel processing

Stream processes data in two ways, serial and parallel. To implement Parallel processing, we simply add a Parallel() method to the sample code, which looks like this:

List names = Arrays.asList("Zhang"."Bill"."Wang Lao Wu"."李三"."Liu Lao Si"."Wang Xiaoer"."Zhang four"."Zhang Five six seven.");
String maxLenStartWithZ = names.stream().parallel() .filter(name -> name.startsWith("Zhang")) .mapToint(String::length) .max() .toString();
Copy the code

Parallel processing of a Stream is implemented in the same way as serial processing until the finalization operation is performed. After calling the finalizing method, the implementation is a little different, calling TerminalOp’s evaluateParallel method for parallel processing.

final R evaluate(TerminalOp terminalOp) {
	assert getOutputShape() == terminalOp.inputShape();
	if (linkedOrConsumed)throw new IllegalStateException(MSG_STREAM_LINKED);
	linkedOrConsumed = true;
	return isParallel() ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
Copy the code

By parallel processing, we mean that the Stream is sharded by a ForkJoin framework. The estimateSize method in the Splititerator estimates the amount of data to be sharded.

ForkJoin framework and estimation algorithm, I will not go into details here, if you are interested, you can dig into the source code to analyze the implementation of the algorithm.

The minimum processing unit threshold is obtained through the estimated amount of data. If the current fragment size is greater than the threshold of the minimum processing unit, the set is continued to be sliced. Each shard generates a Sink linked list, and when all shard operations are complete, ForkJoin merges any result sets for shards.

Rational use of Stream

This should give you a good idea of how the Stream API optimizes collection traversal. The Stream API is simple to use and can be processed in parallel. Does the system perform better using the Stream API? With a set of tests, we find out.

We will test and compare the performance of conventional iteration, Stream serial iteration and Stream parallel iteration. In the iteration cycle, we will filter and group data. The following tests were carried out:

  • Compare the performance of int arrays of length 100 in a multi-core CPU server configuration.
  • In the multi-core CPU server configuration environment, compare the performance of the 1.00E+8 int array;
  • In the multi-core CPU server configuration environment, compare the performance of filtering group of 1.00E+8 object array;
  • In the single-cpu server configuration environment, compare the performance of filtering groups with 1.00E+8 object arrays.

Due to limited space, I will directly give the statistical results here, you can also verify yourself, the specific test code can be viewed on Github. Through the above tests, I calculated the test results as follows (iteration usage time) :

  • General iteration
  • Stream parallel iteration < regular iteration
  • Stream parallel iteration < regular iteration
  • General iteration

Through the above test results, we can see that: in the case of fewer iterations, the performance of the conventional iteration method is better; In a single-core CPU server configuration, the conventional iterative approach is more advantageous. In the iteration of big data cycle, if the server is multi-core CPU, the parallel iteration of Stream has obvious advantages. Therefore, when we usually deal with the collection of big data, we should try to consider deploying the application in the multi-core CPU environment and use the parallel iteration method of Stream for processing.

As a matter of fact, we can see that the use of Stream does not necessarily lead to better system performance, but the choice should be based on the application scenario, that is, the proper use of Stream.

conclusion

Throughout the Stream design implementation, it is worth learning. In terms of overall design direction, Stream decomposed the whole operation into chain structure, which not only simplified the traversal operation, but also laid a foundation for the realization of parallel computing.

In terms of small classification direction, Stream divides the operation of traversing elements and the calculation of elements into intermediate operation and termination operation, and intermediate operation is divided into stateful and stateless operation according to whether there is interference between the states of elements, realizing different stages in the chain structure.

** In serial processing operations, **Stream does not do the actual data processing for each intermediate operation. Instead, it concatenates these intermediate operations, which are finally triggered by the finalization operation to generate a linked list of data processing, which is processed by the Spliterator iterator in Java8. At this point, every iteration will process data for all stateless intermediate operations, while for stateful intermediate operations, all data need to be iterated and processed before processing operations. Finally, the data processing for the termination operation.

In parallel operations, the Stream performs the same operation for intermediate operations as it does for serial operations, but in terminating operations, the Stream slicing the collection with the ForkJoin framework, which merges the result joins for each slice. Finally, pay attention to the Stream usage scenarios.

To consider

Here is a simple example of parallel processing. Find out what the problems are.

ParallelListList integerList= new ArrayList(); parallelListList integerList= new ArrayList(); for (int i = 0; i <100; i++) {integerList.add(i); }List parallelList = new ArrayList() ; integerList.stream().parallel().filter(i->i%2==1).forEach(i->parallelList.add(i));
Copy the code

I look forward to seeing your answers in the comments section.