preface
When we operate on set data, we usually iterate through for or iterator, which is not very nice. Java provides the concept of Stream, which allows us to treat collection data as individual elements, and provides a multithreaded schema
- The creation of a flow
- Various data operations for streams
- The termination operation of a stream
- Aggregate processing of streams
- Concurrent streams are used in conjunction with CompletableFuture
Pay attention to the public account, communicate together, wechat search: sneak forward
1 The construction of stream
The built-in constructor for stream
public static<T> Stream<T> iterate(final T seed, final UnaryOperator<T> f)
public static <T> Stream<T> concat(Stream<? extends T> a, Stream<? extends T> b)
public static<T> Builder<T> builder(a)
public static<T> Stream<T> of(T t)
public static<T> Stream<T> empty(a)
public static<T> Stream<T> generate(Supplier<T> s)
Copy the code
The stream function declared by Collection
default Stream<E> stream(a)
Copy the code
- Collection declares a stream conversion function, which means that any Collection subclass has an official method that we implement to convert Collection to stream
- For example, List to Stream
public static void main(String[] args){
List<String> demo = Arrays.asList("a"."b"."c");
long count = demo.stream().peek(System.out::println).count();
System.out.println(count);
}
-------result--------
a
b
c
3
Copy the code
Stream defines the operation methods on elements
Filter filter
Stream<T> filter(Predicate<? super T> predicate)
Copy the code
- Predicate is a functional interface that can be replaced directly with lambda; If you have complex filtering logic, use the OR, and, negate methods
- The sample
List<String> demo = Arrays.asList("a"."b"."c");
Predicate<String> f1 = item -> item.equals("a");
Predicate<String> f2 = item -> item.equals("b");
demo.stream().filter(f1.or(f2)).forEach(System.out::println);
-------result--------
a
b
Copy the code
Mapping to Map
<R> Stream<R> map(Function<? super T, ? extends R> mapper)
IntStream mapToInt(ToIntFunction<? super T> mapper);
LongStream mapToLong(ToLongFunction<? super T> mapper);
DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper);
Copy the code
- The sample
static class User{
public User(Integer id){this.id = id; }
Integer id; public Integer getId(a) { returnid; }}public static void main(String[] args) {
List<User> demo = Arrays.asList(new User(1), new User(2), new User(3));
// Convert User to Integer(id)
demo.stream().map(User::getId).forEach(System.out::println);
}
-------result--------
1
2
3
Copy the code
Data processing PEEK
Stream<T> peek(Consumer<? super T> action);
Copy the code
- The difference with map is that there is no return value
- The sample
static class User{
public User(Integer id){this.id = id; }
Integer id;
public Integer getId(a) { return id; }
public void setId(Integer id) { this.id = id; }}public static void main(String[] args) {
List<User> demo = Arrays.asList(new User(1), new User(2), new User(3));
// id squared, User converted to Integer(id)
demo.stream().peek(user -> user.setId(user.id * user.id)).map(User::getId).forEach(System.out::println);
}
-------result--------
1
4
9
Copy the code
Maps flatten flatmaps
<R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper);
IntStream flatMapToInt(Function<? super T, ? extends IntStream> mapper);
LongStream flatMapToLong(Function<? super T, ? extends LongStream> mapper);
DoubleStream flatMapToDouble(Function<? super T, ? extends DoubleStream> mapper);
Copy the code
- FlatMap: flatMap a Stream with element type T into a Stream with element type T
- The sample
public static void main(String[] args) {
List<Stream<Integer>> demo = Arrays.asList(Stream.of(5), Stream.of(2), Stream.of(1));
demo.stream().flatMap(Function.identity()).forEach(System.out::println);
}
-------result--------
5
2
1
Copy the code
To heavy distinct
Stream<T> distinct(a);
Copy the code
- The sample
List<Integer> demo = Arrays.asList(1.1.2);
demo.stream().distinct().forEach(System.out::println);
-------result--------
1
2
Copy the code
Sorting sorted
Stream<T> sorted(a);
Stream<T> sorted(Comparator<? super T> comparator);
Copy the code
- The sample
List<Integer> demo = Arrays.asList(5.1.2);
// Default ascending order
demo.stream().sorted().forEach(System.out::println);
/ / descendingComparator<Integer> comparator = Comparator.<Integer, Integer>comparing(item -> item).reversed(); demo.stream().sorted(comparator).forEach(System.out::println); ------- Default ascending result--------1
2
5-- -- -- -- -- -- -- descending result -- -- -- -- -- -- -- --5
2
1
Copy the code
Number limit limit and skip
// Truncate the first maxSize element
Stream<T> limit(long maxSize);
// Skip the first n streams
Stream<T> skip(long n);
Copy the code
- The sample
List<Integer> demo = Arrays.asList(1.2.3.4.5.6);
// Skip the first two, then limit the intercepts to two
demo.stream().skip(2).limit(2).forEach(System.out::println);
-------result--------
3
4
Copy the code
New operations provided by JDK9
- Different from filter, takeWhile takes the element that meets the condition until it does not. DropWhile discards elements that meet the criteria until they do not
default Stream<T> takeWhile(Predicate<? super T> predicate);
default Stream<T> dropWhile(Predicate<? super T> predicate);
Copy the code
3 Stream termination action
Traverse the consumption
// Iterate over the consumption
void forEach(Consumer<? super T> action);
// Sequential traversal consumption, the difference between forEach and forEach is that forEachOrdered is executed in multi-threaded parallelStream, and its order is not out of order
void forEachOrdered(Consumer<? super T> action);
Copy the code
- The sample
List<Integer> demo = Arrays.asList(1.2.3);
demo.parallelStream().forEach(System.out::println);
demo.parallelStream().forEachOrdered(System.out::println);
-------forEach result--------
2
3
1
-------forEachOrdered result--------
1
2
3
Copy the code
Get array results
// Transfer to an Object array
Object[] toArray();
// Stream into A[] array, specifying type A
<A> A[] toArray(IntFunction<A[]> generator)
Copy the code
- The sample
List<String> demo = Arrays.asList("1"."2"."3");
//<A> A[] toArray(IntFunction<A[]> generator)
String[] data = demo.stream().toArray(String[]::new);
Copy the code
Maximum minimum value
// Get the minimum value
Optional<T> min(Comparator<? super T> comparator)
// Get the maximum value
Optional<T> max(Comparator<? super T> comparator)
Copy the code
- The sample
List<Integer> demo = Arrays.asList(1.2.3);
Optional<Integer> min = demo.stream().min(Comparator.comparing(item->item));
Optional<Integer> max = demo.stream().max(Comparator.comparing(item->item));
System.out.println(min.get()+"-"+max.get());
-------result--------
1-3
Copy the code
To find the matching
// Any one matches
boolean anyMatch(Predicate<? super T> predicate)
// All matches
boolean allMatch(Predicate<? super T> predicate)
/ / don't match
boolean noneMatch(Predicate<? super T> predicate)
// find the first one
Optional<T> findFirst(a);
// Any one of them
Optional<T> findAny(a);
Copy the code
Reduction to merge
// Pair up
Optional<T> reduce(BinaryOperator<T> accumulator)
// Pairwise merge, with initial value
T reduce(T identity, BinaryOperator<T> accumulator)
// Convert the element type first and then merge it with the initial value
<U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner)
Copy the code
- The sample
List<Integer> demo = Arrays.asList(1.2.3.4.5.6.7.8);
// Numbers are converted to strings and then concatenated with "-"
String data = demo.stream().reduce("0", (u, t) -> u + "-" + t, (s1, s2) -> s1 + "-" + s2);
System.out.println(data);
-------result--------
0-1-2-3-4-5-6-7-8
Copy the code
Counting elements
long count(a)
Copy the code
- The sample
List<Integer> demo = Arrays.asList(1.2.3.4.5.6);
System.out.println(demo.stream().count());
-------result--------
6
Copy the code
Polymerization treatment of convection
/** * supplier: accumulator: element consumer (process and add R) * combiner: how to combiner */
<R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner);
/** * Collector is a collector class composed of Supplier, Accumulator, Combiner, Finisher, and Characteristics. It can provide built-in aggregation classes or methods */
<R, A> R collect(Collector<? super T, A, R> collector);
Copy the code
- For an example, see below
4 Collector(aggregation) Tool class collection
Interface Collector and implementation class CollectorImpl
// Return the producer of the value type
Supplier<A> supplier(a);
// Stream element consumers
BiConsumer<A, T> accumulator(a);
// Return value consolidator (multiple thread operations generate multiple return values and need to merge)
BinaryOperator<A> combiner(a);
// Return value converter (last step, actual return result, generally return as is)
Function<A, R> finisher(a);
// Stream properties
Set<Characteristics> characteristics(a);
public static<T, A, R> Collector<T, A, R> of(Supplier supplier, BiConsumer
accumulator, BinaryOperator
,> combiner, Function
finisher, Characteristics... characteristics)
,>
Copy the code
Stream aggregation converts to List, Set
// Convert to List
public static<T> Collector<T, ? , List<T>> toList()// Convert to Set
public static<T> Collector<T, ? , Set<T>> toSet()Copy the code
- The sample
List<Integer> demo = Arrays.asList(1.2.3);
List<Integer> col = demo.stream().collect(Collectors.toList());
Set<Integer> set = demo.stream().collect(Collectors.toSet());
Copy the code
Flow aggregation is converted to a Map
// Convert to Map
public static<T, K, U> Collector<T, ? , Map<K,U>> toMap( Function<?super T, ? extends K> keyMapper,
Function<? super T, ? extends U> valueMapper)
/** * mergeFunction: */
public static<T, K, U> Collector<T, ? , Map<K,U>> toMap( Function<?super T, ? extends K> keyMapper,
Function<? super T, ? extends U> valueMapper,
BinaryOperator<U> mergeFunction)
/** * mergeFunction: how to combine the same key * mapSupplier: returns the producer of the Map */
public static<T, K, U, M extends Map<K, U>> Collector<T, ? , M> toMap( Function<?super T, ? extends K> keyMapper,
Function<? super T, ? extends U> valueMapper,
BinaryOperator<U> mergeFunction,
Supplier<M> mapSupplier)
Copy the code
- If there are elements with the same key, an error is reported; Or use groupBy
- The sample
List<User> demo = Arrays.asList(new User(1), new User(2), new User(3));
Map<Integer,User> map = demo.stream().collect(Collectors.toMap(User::getId,item->item));
System.out.println(map);
-------result-------
{1=TestS$User@7b23ec81, 2=TestS$User@6acbcfc0, 3=TestS$User@5f184fc6}
Copy the code
String stream aggregation concatenation
// Concatenate multiple strings into a single string
public staticCollector<CharSequence, ? , String> joining();// Concatenate multiple strings into a single string (specifying delimiters)
public staticCollector<CharSequence, ? , String> joining(CharSequence delimiter)Copy the code
- The sample
List<String> demo = Arrays.asList("c"."s"."c"."w"."Sneak along.");
String name = demo.stream().collect(Collectors.joining("-")); System.out.println(name); -------result------- C-S-C-W - stealth forwardCopy the code
The map handles the reaggregation flow
- It is equivalent to map and collect
/** * mapper: indicates a mapping processor * downstream: indicates that the mapping process needs to be aggregated again */
public static<T, U, A, R> Collector<T, ? , R> mapping(Function<?super T, ? extends U> mapper,
Collector<? super U, A, R> downstream);
Copy the code
- The sample
List<String> demo = Arrays.asList("1"."2"."3");
List<Integer> data = demo.stream().collect(Collectors.mapping(Integer::valueOf, Collectors.toList()));
System.out.println(data);
-------result-------
[1.2.3]
Copy the code
After aggregation, the results are transformed
/** * Downstream: aggregation process * finisher: result conversion process */
public static<T,A,R,RR> Collector<T,A,RR> collectingAndThen(Collector
downstream, Function
finisher)
,>
,a,r>;
Copy the code
- The sample
List<Integer> demo = Arrays.asList(1.2.3.4.5.6);
// Aggregate into a List, and finally extract the size of the array as the return value
Integer size = demo.stream().collect(Collectors.collectingAndThen(Collectors.toList(), List::size));
System.out.println(size);
---------result----------
6
Copy the code
Stream grouping (Map is HashMap)
/** * classifier specifies an attribute of type T as the Key. * after grouping, use List as the container for each flow. */
public static<T, K> Collector<T, ? , Map<K, List<T>>> groupingBy( Function<?super T, ? extends K> classifier);
/** * downstream: downstream of each group of streams */
public static<T, K, A, D> Collector<T, ? , Map<K, D>> groupingBy( Function<?super T, ? extends K> classifier,
Collector<? super T, A, D> downstream)
/** * downstream: the aggregation processor for each group of streams */
public static<T, K, D, A, M extends Map<K, D>> Collector<T, ? , M> groupingBy( Function<?super T, ? extends K> classifier,
Supplier<M> mapFactory,
Collector<? super T, A, D> downstream)
Copy the code
- The sample
public static void main(String[] args) throws Exception {
List<Integer> demo = Stream.iterate(0, item -> item + 1)
.limit(15)
.collect(Collectors.toList());
// Divide into three groups, and each group of elements is converted to String
Map<Integer, List<String>> map = demo.stream()
.collect(Collectors.groupingBy(item -> item % 3,
HashMap::new,
Collectors.mapping(String::valueOf, Collectors.toList())));
System.out.println(map);
}
---------result----------
{0= [0.3.6.9.12].1= [1.4.7.10.13].2= [2.5.8.11.14]}
Copy the code
Stream grouping (ConcurrentHashMap is used for grouping)
/** * classifier: classifier; After grouping, use List as a container for each stream */
public static<T, K> Collector<T, ? , ConcurrentMap<K, List<T>>> groupingByConcurrent( Function<?super T, ? extends K> classifier);
/** * downstream: downstream: aggregation of streams */
public static<T, K, A, D> Collector<T, ? , ConcurrentMap<K, D>> groupingByConcurrent( Function<?super T, ? extends K> classifier, Collector<? super T, A, D> downstream)
/** * classifier: classifier * mapFactory: the production factory of the return value type Map (a subclass of ConcurrentMap) * Downstream: the aggregation processor of the flow */
public static<T, K, A, D, M extends ConcurrentMap<K, D>> Collector<T, ? , M> groupingByConcurrent( Function<?super T, ? extends K> classifier,
Supplier<M> mapFactory,
Collector<? super T, A, D> downstream);
Copy the code
- Use the same as groupingBy
Split split, one into two (equivalent to special groupingBy)
public static<T> Collector<T, ? , Map<Boolean, List<T>>> partitioningBy( Predicate<?super T> predicate)
/** * Predicate: binary * Downstream: aggregate processor for streams */
public static<T, D, A> Collector<T, ? , Map<Boolean, D>> partitioningBy( Predicate<?super T> predicate, Collector<? super T, A, D> downstream)
Copy the code
- The sample
List<Integer> demo = Arrays.asList(1.2.3.4.5.6);
// Odd and even groups
Map<Boolean, List<Integer>> map = demo.stream()
.collect(Collectors.partitioningBy(item -> item % 2= =0));
System.out.println(map);
---------result----------
{false= [1.3.5].true= [2.4.6]}
Copy the code
Average polymerization
// Return type Double
public static<T> Collector<T, ? , Double> averagingDouble(ToDoubleFunction<?super T> mapper)
// Return type Long
public static<T> Collector<T, ? , Double> averagingLong(ToLongFunction<?super T> mapper)
// Returns an Int
public static<T> Collector<T, ? , Double> averagingInt(ToIntFunction<?super T> mapper)
Copy the code
- The sample
List<Integer> demo = Arrays.asList(1.2.5);
Double data = demo.stream().collect(Collectors.averagingInt(Integer::intValue));
System.out.println(data);
---------result----------
2.6666666666666665
Copy the code
Stream aggregation finds Max and min values
/ / the minimum
public static<T> Collector<T, ? , Optional<T>> minBy(Comparator<?super T> comparator)
/ / Max
public static<T> Collector<T, ? , Optional<T>> maxBy(Comparator<?super T> comparator)
Copy the code
- The sample
List<Integer> demo = Arrays.asList(1.2.5);
Optional<Integer> min = demo.stream().collect(Collectors.minBy(Comparator.comparing(item -> item)));
Optional<Integer> max = demo.stream().collect(Collectors.maxBy(Comparator.comparing(item -> item)));
System.out.println(min.get()+"-"+max.get());
---------result----------
1-5
Copy the code
Aggregate computes statistical results
- You can get the total number of elements, the cumulative sum of elements, the minimum, the maximum, the average
// Returns an Int
public static<T> Collector<T, ? , IntSummaryStatistics> summarizingInt( ToIntFunction<?super T> mapper)
// Return type Double
public static<T> Collector<T, ? , DoubleSummaryStatistics> summarizingDouble( ToDoubleFunction<?super T> mapper)
// Return type Long
public static<T> Collector<T, ? , LongSummaryStatistics> summarizingLong( ToLongFunction<?super T> mapper)
Copy the code
- The sample
List<Integer> demo = Arrays.asList(1.2.5);
IntSummaryStatistics data = demo.stream().collect(Collectors.summarizingInt(Integer::intValue));
System.out.println(data);
---------result----------
IntSummaryStatistics{count=3, sum=8, min=1, average=2.666667, max=5}
Copy the code
New aggregation methods provided by JDK12
// Streams are aggregated by downstream1 and downstream2 respectively, and the two aggregated results are combined
public static<T, R1, R2, R> Collector<T, ? , R> teeing( Collector<?superT, ? , R1> downstream1, Collector<?superT, ? , R2> downstream2, BiFunction<?super R1, ? super R2, R> merger)
Copy the code
Use of concurrent paralleStream
- Use with CompletableFuture and thread pools
- The sample
public static void main(String[] args) throws Exception{
List<Integer> demo = Stream.iterate(0, item -> item + 1)
.limit(5)
.collect(Collectors.toList());
/ / sample 1
Stopwatch stopwatch = Stopwatch.createStarted(Ticker.systemTicker());
demo.stream().forEach(item -> {
try {
Thread.sleep(500);
System.out.println(Example 1 - ""+Thread.currentThread().getName());
} catch (Exception e) { }
});
System.out.println(Example 1 - ""+stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
// For example 2, note that ForkJoinPool and parallelStream are required to use executor specified threads. Otherwise, use the default ForkJoinPool.commonPool().
ExecutorService executor = new ForkJoinPool(10);
stopwatch.reset(); stopwatch.start();
CompletableFuture.runAsync(() -> demo.parallelStream().forEach(item -> {
try {
Thread.sleep(1000);
System.out.println("Sample 2 -" + Thread.currentThread().getName());
} catch (Exception e) { }
}), executor).join();
System.out.println("Sample 2 -"+stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
/ / sample 3
stopwatch.reset(); stopwatch.start();
demo.parallelStream().forEach(item -> {
try {
Thread.sleep(1000);
System.out.println("Example 3 -"+Thread.currentThread().getName());
} catch (Exception e) { }
});
System.out.println("Example 3 -"+stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
executor.shutdown();
}
Copy the code
- ——————-result————————–
The sample1- the main sample1- the main sample1- the main sample1- the main sample1- the main sample1-2501The sample2-ForkJoinPool-1-worker-19The sample2-ForkJoinPool-1-worker-9The sample2-ForkJoinPool-1-worker-5The sample2-ForkJoinPool-1-worker-27The sample2-ForkJoinPool-1-worker-23The sample2-1004The sample3- the main sample3-ForkJoinPool.commonPool-worker-5The sample3-ForkJoinPool.commonPool-worker-7The sample3-ForkJoinPool.commonPool-worker-9The sample3-ForkJoinPool.commonPool-worker-3The sample3-1001
Copy the code
- ParallelStream’s methods do run using multiple threads, and thread pools can be specified, but custom threads must be of type ForkJoinPool, otherwise ForkJoinPool.commonPool() is used by default