Author: Unreal good

Source: Hang Seng LIGHT Cloud Community

What is flow?

Streams are a new concept introduced in Java8 to process data in collections, which can be thought of as a high-level collection for the moment.

As is known to all, set operations is very troublesome, for selection and projection of the collection, need to write a lot of code, and the flow is operating in the form of statement sets, it is like SQL statements, we only need to tell what needs to be the collection operation, it can operate automatically, and the execution result to you, without our written code.

Therefore, the collection operation of the stream is transparent to us, we just need to stream the command and it will automatically give us the result we want. Because the operation is handled entirely by Java, it can choose the best way to handle it based on the current hardware environment, and we don’t need to write complex and error-prone multithreaded code.

The characteristics of the flow

  1. We can think of a stream as a pipeline, where the source of the pipeline is our data source (a collection), and the elements from the data source are in turn channeled to the pipeline, where we can perform various operations on the elements. Once the elements are at the other end of the pipeline, they are “consumed” and we can no longer manipulate the flow. Of course, we can get a new stream from the data source and iterate over it again.
  2. If we want to process the collection by internal iteration, we need to process the code by hand, which is called external iteration. To process a flow, we simply tell the flow what results we want, and the flow takes care of itself. This is called internal iteration.

The type of operation for the flow

There are two types of flow operations, intermediate operations and terminal operations.

  1. Intermediate operations When the data in the data source is pipelined, all operations performed by the process on the data are referred to as “intermediate operations.” Intermediate operations still return a stream object, so multiple intermediate operations can be concatenated to form a pipeline.
  2. Terminal operation After all intermediate operations are completed, terminal operation is required to remove data from the pipeline. The terminal operation will return an execution result, which is the data you want.

The operation of a stream

There are three steps to using a stream:

  1. Prepare a data source
  2. Performing intermediate operations There can be multiple intermediate operations that can be connected to form a pipeline.
  3. Perform terminal operation After performing terminal operation, the stream ends and you get an execution result.

The use of the flow

Create a flow

Before you can use a Stream, you first need to have a data source and get the Stream objects for that data source through some of the methods provided by the Stream API. Data sources can take many forms:

Set 1.

The stream() method is used to get the stream object:

List<Human> list = new ArrayList<Human>(); 
Stream<Human> stream = list.stream();
Copy the code

An array of 2.

Get the stream objects of Arrays from the static function stream() provided by the Arrays class:

String[] names = {"chaimm"."peter"."john"};
Stream<String> stream = Arrays.stream(names);
Copy the code

3. The value

Convert several values directly into stream objects:

Stream<String> stream = Stream.of("chaimm"."peter"."john");
Copy the code

4. The file

try(Stream lines = files.lines (paths.get (" Paths "), charset.defaultCharset ())){Stream lines = files.lines (paths.get (" Paths "), charset.defaultCharset ())){// TODO
}catch(IOException e){
}
Copy the code

5. iterator

Creating an infinite stream

Stream.iterate(0, n -> n + 2).limit(10).forEach(System.out::println);
Copy the code

Java7 simplifies IO operations by placing open IO operations in parentheses after a try to omit closing IO code.

Screening of the filter

The filter function takes a Lambda expression as an argument, which returns Boolean. During execution, the stream passes elements to the filter one by one, filtering out elements whose execution result is true.

For example, all students were screened out:

List<Human> result = list.stream().filter(Man::isStudent).collect(toList());
Copy the code

To heavy distinct

Remove duplicate results:

List<Human> result = list.stream().distinct().collect(toList());
Copy the code

Interception of limit

Intercept the first three elements of the stream:

List<Human> result = list.stream().limit(3).collect(toList());
Copy the code

Mapping the map

A function is performed on each element in the flow to convert the element to another type of output. The flow feeds each element to the map function, executes the Lambda expression in the map, and stores the result of the execution into a new flow.

For example, to get the name of each person (actually converting Human to String) :

List<String> result = list.stream().map(Human::getName).collect(toList());
Copy the code

Skip skip

Skip the first 3 elements of the stream:

List<Human> result = list.stream().skip(3).collect(toList());
Copy the code

Merge concat

Merge 2 elements:

List<Human> result = Stream.concat(human1, human2).collect(toList());
Copy the code

Merging multiple streams

Example: List the different words in the List as follows:

List<String> list = new ArrayList<String>();
list.add("zhong");
list.add("ming");
list.add("mao");
Copy the code

Here’s the idea:

First turn the list into a stream:

list.stream();
Copy the code

Space participles:

list.stream().map(line->line.split(""));
Copy the code

After the word is split, each element becomes a String[] array.

Turn each String[] into a stream:

list.stream().map(line->line.split("")).map(Arrays::stream)
Copy the code

At this point, a stream contains small flows, and we need to merge these small flows into one flow.

Merge the streams into a single stream: replace the map with a flatMap

list.stream().map(line->line.split("")).flatMap(Arrays::stream)
Copy the code

duplicate removal

list.stream().map(line->line.split("")).flatMap(Arrays::stream).distinct().collect(toList());
Copy the code

Whether to match any element: anyMatch

AnyMatch is used to determine whether there is at least one element in the stream that meets the specified condition. This condition is passed to anyMatch through a Lambda expression, and the execution result is of Boolean type.

For example, check whether there are students in the list:

boolean result = list.stream()
            .anyMatch(Man::isStudent);
Copy the code

Match all elements: allMatch

AllMatch is used to determine whether all elements in the stream meet the specified condition. This condition is passed to anyMatch through a Lambda expression, and the execution result is of type Boolean.

For example, to determine whether everyone is a student:

boolean result = list.stream().allMatch(Man::isStudent);
Copy the code

Are all elements not matched: noneMatch

NoneMatch, as opposed to allMatch, is used to determine if all elements in the stream do not meet the specified criteria:

boolean result = list.stream().noneMatch(Man::isStudent);
Copy the code

Get any element findAny

FindAny can select any element from the stream, and it returns an element of type Optional.

Optional<Human> human = list.stream().findAny();
Copy the code

Get the first element, findFirst

Optional<Human> human = list.stream().findFirst();
Copy the code

reduction

Reduction is to fold all elements in a set into an element output after specified operations, such as: find the maximum value, average, etc. These operations are to fold the elements of a set into an element output.

In a flow, the reduce function can achieve reduction.

The reduce function takes two arguments:

  1. The initial value
  2. Lambda expressions that perform reduction operations

Element summation: Custom Lambda expressions implement summation

Calculate the sum of the ages of all people

int age = list.stream().reduce(0, (human1,human2)->human1.getAge()+human2.getAge());
Copy the code
  1. The first parameter of reduce indicates that the initial value is 0;
  2. The second argument to reduce is the reduction operation that needs to be performed. It takes a Lambda expression with two arguments, and Reduce adds elements from the stream to the Lambda expression in pairs, and finally computes the sum.

Element summation: Sum using integer. sum

If the current stream element is of numeric type, Integer provides the sum function instead of the custom Lambda expression, as in:

int age = list.stream().reduce(0, Integer::sum);
Copy the code

The Integer class also provides a series of numeric operations, such as min and Max, that can be used directly when elements in a stream are numeric types.

The use of numerical streams

Numerical operations performed by Reduce involve boxing and unboxing between the basic and reference numerical types, resulting in low efficiency.

When the flow operation is purely numerical, high efficiency can be obtained by using numerical flow.

Replace a normal stream with a numeric stream

StreamAPI provides three types of numeric streams: IntStream, DoubleStream, LongStream, and three methods for converting an ordinary stream to a numeric stream: mapToInt, mapToDouble, and mapToLong.

For example, to convert age in Human to a numeric stream:

IntStream stream = list.stream().mapToInt(Human::getAge);
Copy the code

Numerical calculation

Each value stream provides numerical calculation functions, such as Max, min, sum, and so on. For example, find the oldest age:

OptionalInt maxAge = list.stream().mapToInt(Human::getAge).max();
Copy the code

Because a stream can be empty and it makes no sense to Max an empty stream, the Max function returns OptionalInt, a subclass of Optional, which determines whether the stream is empty and treats the empty case accordingly.

In addition, the returns of mapToInt, mapToDouble, and mapToLong are OptionalInt, OptionalDouble, and OptionalLong, respectively

Intermediate operations and collection operations

operation type The return type Type/functional interface used Function descriptor
filter In the middle Stream<T> Predicate<T> T -> boolean
distinct In the middle Stream<T>
skip In the middle Stream<T> long
map In the middle Stream<R> Function<T, R> T -> R
flatMap In the middle Stream<R> Function<T, Stream<R>> T -> Stream<R>
limit In the middle Stream<T> long
sorted In the middle Stream<T> Comparator<T> (T, T) -> int
anyMatch terminal boolean Predicate<T> T -> boolean
noneMatch terminal boolean Predicate<T> T -> boolean
allMatch terminal boolean Predicate<T> T -> boolean
findAny terminal Optional<T>
findFirst terminal Optional<T>
forEach terminal void Consumer<T> T -> void
collect terminal R Collector<T, A, R>
reduce terminal Optional<T> BinaryOperator<T> (T, T) -> T
count terminal long

The Collector to collect

The collector is used to finalize filtered, mapped streams, and to render the final results in different forms.

The Collect method is the Collector, which receives the implementation of the Collector interface as the collection method for the specific Collector.

The Collector interface provides a number of default implementation methods that we can use directly to format the results of the stream; You can also customize the implementation of the Collector interface to customize your own Collector.

reduction

A stream consists of elements, and reduction is to “fold” each element into a value, such as summing, maximizing and averaging are all reduction operations.

General reduction

If you need to customize a reduction operation, you need to use the mister. reducing function, which takes three arguments:

  • The first parameter is the initial reduced value
  • The second parameter is the field for the reduction operation
  • The third parameter is the process of the reduction operation

summary

The Collectors class provides a factory method for aggregation: Collectors. SummingInt.

It takes a function that maps an object to a sum of required ints and returns a collector; Once passed to the normal Collect method, the collector performs the summary operations we need.

grouping

Grouping is a more natural way to split data. Grouping is the grouping of elements in a stream by a specified category, similar to GROUPBY in an SQL statement.

Multistage grouping

Multi-level grouping allows you to group each group separately after completing a group.

Multiple levels of grouping can be achieved using the groupingBy overload method with two parameters.

  • The first parameter: the condition for the level 1 grouping
  • Second parameter: a new onegroupingByFunction that contains conditions for secondary grouping

Static factory methods of the Collectors class

The factory method The return type use The sample
toList List<T> Collect all the items in the stream into a List List<Project> projects = projectStream.collect(toList());
toSet Set<T> Collect all items in the stream into a Set and remove duplicates Set<Project> projects = projectStream.collect(toSet());
toCollection Collection<T> Collects all items in the flow into a collection created by the given provider Collection<Project> projects = projectStream.collect(toCollection(), ArrayList::new);
counting Long Count the number of elements in the stream long howManyProjects = projectStream.collect(counting());
summingInt Integer Summation of an integer property of an item in the stream int totalStars = projectStream.collect(summingInt(Project::getStars));
averagingInt Double Calculates the average value of the item Integer attribute in the stream double avgStars = projectStream.collect(averagingInt(Project::getStars));
summarizingInt IntSummaryStatistics Collect statistics on the Integer attributes of items in the stream, such as maximum, minimum, sum, and average IntSummaryStatistics projectStatistics = projectStream.collect(summarizingInt(Project::getStars));
joining String Concatenate the string generated by calling the toString method for each item in the stream String shortProject = projectStream.map(Project::getName).collect(joining(", "));
maxBy Optional<T> Optional of the largest element selected by the given comparator, or option.empty () if the stream is empty Optional<Project> fattest = projectStream.collect(maxBy(comparingInt(Project::getStars)));
minBy Optional<T> Optional of the smallest element selected by the given comparator, or option.empty () if the stream is empty Optional<Project> fattest = projectStream.collect(minBy(comparingInt(Project::getStars)));
reducing The type produced by the reduction operation Starting with an initial value that acts as an accumulator, the stream is reduced to a single value by combining elements in the stream one by one with a BinaryOperator int totalStars = projectStream.collect(reducing(0, Project::getStars, Integer::sum));
collectingAndThen Converts the type returned by the function Contains another collector to apply conversion functions to its results int howManyProjects = projectStream.collect(collectingAndThen(toList(), List::size));
groupingBy Map<K, List<T>> Group the items in the stream based on the value of one of their attributes and use the attribute value as the key of the resulting Map Map<String,List<Project>> projectByLanguage = projectStream.collect(groupingBy(Project::getLanguage));
partitioningBy Map<Boolean,List<T>> Projects are partitioned based on the results of applying assertions to each project in the flow Map<Boolean,List<Project>> vegetarianDishes = projectStream.collect(partitioningBy(Project::isVegetarian));

Conversion type

There are collectors that can generate additional collections. ToList, for example, which you’ve already seen, generates an instance of the java.util.List class.

There are also toSet and toCollection, which generate instances of the Set and Collection classes, respectively.

I’ve talked a lot about chain operations on streams so far, but there are times when you need to eventually generate a collection — for example:

  • The existing code is written for collections, so you need to pass in the flow as collections;
  • After doing a series of chain operations on the set, you eventually want to generate a value;
  • When writing unit tests, you need to make assertions about a specific collection.

Using toCollection, collect elements with custom collections

stream.collect(toCollection(TreeSet::new));
Copy the code

You can also use the collector to have a stream generate a value. MaxBy and minBy allow the user to generate a value in a particular order.

Data partition

Partitioning is a special case of grouping: an assertion (a function that returns a Boolean value) serves as a classification function, called a partitioning function.

The partition function returns a Boolean value, which means that the resulting grouped Map has a Boolean key type, so it can be divided into at most two groups: true for a group and false for a group.

The benefit of partitioning is that it preserves two sets of stream element lists that the partitioning function returns true or false.

The parallel flow

A parallel flow is a flow that splits content into chunks of data and uses different threads to process each chunk separately. Finally, the calculation results of each data block are combined.

Turn a sequentially executed stream into a concurrent stream by calling the parallel() method

public static long parallelSum(long n){
    return Stream.iterate(1L, i -> i +1).limit(n).parallel().reduce(0L,Long::sum);
}
Copy the code

To turn a concurrent stream into a sequential stream simply call the sequential() method

stream.parallel().filter(...) .sequential().map(...) .parallel().reduce();Copy the code

Both methods can be called multiple times, and only the last call determines whether the flow is sequential or concurrent.

The default number of threads used by concurrent streams is equal to the number of processor cores on your machine.

This method allows you to modify the value, which is a global property.

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "12");
Copy the code

The performance of processing data using multithreaded parallel streams is not necessarily better than that of single-threaded sequential streams, as performance is affected by a number of factors.

Some tips on how to use concurrent streams efficiently:

  1. If in doubt, test yourself.
  2. Use basic types of streams whenever possibleIntStream.LongStream.DoubleStream
  3. Some operations using concurrent streams perform worse than sequential streams, for examplelimit.findFirstOperations that depend on the order of elements can be extremely costly in a concurrent stream.findAnyPerformance is much better because it does not depend on order.
  4. Consider the performance of the computation (Q) in the stream versus the performance of the operation (N), where Q represents the time required for a single process and N represents the number of processes required. The higher the value of Q, the better the performance will be with concurrent streams.
  5. Using concurrent streams when data volume is small does not improve performance.
  6. Consider data structures: Concurrent streams require data to be decomposed, and different data structures perform differently when decomposed.

The data source and resolvability of the flow

The source decomposability
ArrayList Very good
LinkedList poor
IntStream.range Very good
Stream.iterate poor
HashSet good
TreeSet good

The characteristics of the stream and the modification of the intermediate operation stream can affect the data decomposition performance. For example, a fixed-size stream can be equally divided as the task is decomposed, but if there is a filter operation, the stream cannot know in advance how many elements will be left after that operation.

Consider the performance of terminal operations: If the performance cost of terminal operations in merging the results of concurrent streams is too high, the performance gains gained by using concurrent streams will not be worth the cost.