preface

Benefits of reading this article:

  • – Most articles on the web categorize the Collectors API, explain how to use it, and ask the tool package to help you create a Collector. I have analyzed the source code of the Collector implementation to let you know why it is used.
  • – Reduce API memory;
  • – When an exception is thrown, you probably know what the problem is;

The main introduction of this series

  • Collect Collector, Collectors -> Collect elements from the stream
  • 2. GroupingBy grouping and nested grouping (subsequent) have been completed
  • 3, partitioningBy partition and nested partition (subsequent)

Considerations: Understanding the Collector specification is central to understanding the Collector, knowing how to create a new Collector implementation class, what its different methods represent, and how functions are concatenated.


Collect method

Let’s start with a bit of code

Person p1 = new Person("hehe".20);
        Person p2 = new Person("wang".20);
        Person p6 = new Person("hou".25);
        Person p5 = new Person("hou".25);
        Person p3 = new Person("mabi".30);
        Person p4 = new Person("li".30); List<Person> personList = Arrays.asList(p1, p2, p3, p4, p5, p6); List<String> per1 = personList.stream() .map(Person::getName).filter(String::toUpperCase).collect(toList()); -- Collect into a collectionCopy the code

This is the simplest and most commonly used form of functional programming, and when wrapped up it even resembles executing an SQL; All you need to do is pass in some functions like Predicate and things like that. Let’s investigate what happens to Collect (toList()).

Second, the question of collect method

Collect method is mainly used for collection, but does not mean to collect a collection, please remember this sentence.

  1. Collect it is an overloaded method:


R collect(Collector
collector); -> This is what most people use. This method requires passing in a Collector and returning an R
,>


R collect(Supplier

supplier, BiConsumer

accumulator, BiConsumer

combiner); -> This is another slightly lower level API
,>
,>

  1. What is a Collector? What do the paradigms T, A, and R represent?

This is part of the point of functional programming in java8, and it’s very, very easy to understand what the code means.

  • But have you ever wondered what collection it forms? ArrayList, LinkedList, or some other implementation?

C) Collector parsing

Collect (Collector
collector); The Collector

public interface Collector<T.A.R> {
    Supplier<A> supplier(a);
    BiConsumer<A, T> accumulator(a);
    BinaryOperator<A> combiner(a);
    Function<A, R> finisher(a);
    Set<Characteristics> characteristics(a);
    / /... slightly
 }
Copy the code

Collector is an interface with several abstract methods, including supplier, Accumulator, combiner, finisher, characteristics. When you create an implementation class, pass it to Collec () and collect it according to your requirements.

  1. Supplier<A> supplier()Provide a functional interface T get();
  • As producers, we need to provide the Collector with a container to hold elements in the Stream.
  • Supplier s1 = HashSet::new
  • Supplier s2 = ArrayList::new

Analogy:

  • StringBuilder::newCreate a StringBuilder to collectCharSequence

We completed the initial construction of the container

  • Consider: Do concurrent flows create multiple containers?

2.BiConsumer

Accumulator ()
,>

  • He is a consumer, but also a binary consumer, needs to provide T U it to consume; This means that every time a new element from the stream is passed in, it needs to be incorporated into the container that supplier creates.

Such as:

  • List::add-> Add elements to List
  • StringBuilder::append– > Add CharSequence to StringBuilder

We complete the relationship between the elements in the flow and the container

  • Consider: How do concurrent streams perform?

  1. Function<A, R> finisher()Provide a final return result
  • A is the container provided in the supplier, and R is returned eventually. A=R is perfectly fine.

Such as:

  • Function. The identity (); Return t -> t;
  • Will the List – > Set; List -> Map is OK, depending on how you want to return container A.

4.Set

characteristics(); 1. UNORDERED, is the collection ordered? Is this an accumulator with the same A container? IDENTITY_FINISH finisher(),


  1. BinaryOperator<A> combiner()Provides a fender, which is used inConcurrent flowThe;
  • BinaryOperator is a binary processor that takes two arguments and returns one, R apply(T T, U U) this is strict

  • Here’s how I use it:

  • (List left, List right) -> {left. AddAll (right); -> This is the addALl method for collections, which combines the two collections into one

  • (StringBuiler r1, StringBuiler r2) -> { r1.append(r2); return r1; } -> This is the concatenation of two stringBuilers, returning one of them

  • Parallel () or parallelStream(); parallel(); parallel(); They allow applications to process streams concurrently, by default depending on the number of cores in your CPU + the sum of your CPU’s hyper-threading technology;

  • 1. If A concurrent stream is used, each thread will call supplier once to create an ACCUMULator, and each thread will execute accumulator() to add elements from the stream to its own container, A.

  • If serial streams are used, only one container, A, will be created. Single-threaded Accumulator.

  • When parallelStream is enabled, it is possible to create A parallel container concurrently. When parallelStream is enabled, it is possible to create A parallel container concurrently. If A is A List and one thread is iterating through the collection and another is adding elements, it will throw A concurrent modification exception. If both elements are in the set at the same time, it will overwrite them. StringBuilder should be StringBuffer instead. (Please note that combiner is not performed in this case, after all, there is only one container A)

  • ParallelStream Characteristics If parallelStream Characteristics is enabled and CONCURRENT is not declared, then the threads are isolated. Each thread creates A container, accumulate itself, and perform combiner to merge the containers.

  • 5. If IDENTITY_FINISH is used, the program will not execute A finisher at all because it thinks A == R and simply returns A.

4. Collector implementation

public class Collector_custom {
    public static void main(String[] args) {
        Set<Integer> collect = Arrays.asList(1.2.3.3.4.5.6).stream().collect(new MyCollector<Integer>());
        System.out.println(collect);
    }


    public static class MyCollector<T> implements Collector<T.Set<T>, Set<T>> {

        @Override
        public Supplier<Set<T>> supplier() {
            System.out.println("MyCollector.supplier");
            return HashSet::new; --> We provide a HashSet}@Override
        public BiConsumer<Set<T>, T> accumulator() {
            System.out.println("MyCollector.accumulator");
            returnSet::add; We handle the relation between Set and element T in the stream}@Override
        public BinaryOperator<Set<T>> combiner() {
            System.out.println("MyCollector.combiner");
            return (st1, st2) -> {
                st1.addAll(st2);
                returnst1; -> If it is a concurrent stream and multiple containers are created, we handle the relationship between multiple containers}; }@Override
        public Function<Set<T>, Set<T>> finisher() {
            System.out.println("MyCollector.finisher");
            returnFunction.identity(); -> Handle the container and finally return the protocol, we choose to return both Set<T>}@Override
        public Set<Characteristics> characteristics(a) {
            System.out.println("MyCollector.characteristics");
            returnCollections.unmodifiableSet(EnumSet.of(IDENTITY_FINISH, UNORDERED)); --> When we use IDENTITY_FINISH, we don't need to write finisher(); I don't know if you understand? --> UNORDERED does not pursue order, after all we use HashSet}}}Copy the code

Java8 also has its own implementation: a little different from our implementation, we have implemented 5 parameters, it is the user to pass, this is functional programming!

static class CollectorImpl<T.A.R> implements Collector<T.A.R> {
        private final Supplier<A> supplier;
        private final BiConsumer<A, T> accumulator;
        private final BinaryOperator<A> combiner;
        private final Function<A, R> finisher;
        private final Set<Characteristics> characteristics;
        
        // Pass in 5 parameters,
        CollectorImpl(Supplier<A> supplier,
                      BiConsumer<A, T> accumulator,
                      BinaryOperator<A> combiner,
                      Function<A,R> finisher,
                      Set<Characteristics> characteristics) {
            this.supplier = supplier;
            this.accumulator = accumulator;
            this.combiner = combiner;
            this.finisher = finisher;
            this.characteristics = characteristics;
        }

        CollectorImpl(Supplier<A> supplier,
                      BiConsumer<A, T> accumulator,
                      BinaryOperator<A> combiner,
                      Set<Characteristics> characteristics) {
            this(supplier, accumulator, combiner, castingIdentity(), characteristics);
        }

        @Override
        public BiConsumer<A, T> accumulator(a) {
            return accumulator;
        }

        @Override
        public Supplier<A> supplier(a) {
            return supplier;
        }

        @Override
        public BinaryOperator<A> combiner(a) {
            return combiner;
        }

        @Override
        public Function<A, R> finisher(a) {
            return finisher;
        }

        @Override
        public Set<Characteristics> characteristics(a) {
            returncharacteristics; }}Copy the code

Summary of concurrent flow

  • If stream() is A serial stream, the supplier creates A container A and accumulator accumulator for each element N times. This container is not combiner and finisher checks whether IDENTITY_FINISH is used.
  • ParallelStream () allows CONCURRENT operations on the same container. This enumeration supports CONCURRENT operations on the same container.

May lead to concurrency security problems; If not, each thread creates A container, accumulate, and then combiner, which is safe.

The implementation of toList() is too simple

public static<T> Collector<T, ? , List<T>> toList() {return new CollectorImpl<>((Supplier<List<T>>) ArrayList::new, List::add,
                                   (left, right) -> { left.addAll(right); return left; },
                                   CH_ID);
ArrayList::new,List::add, (left, right) -> {left.addall (right); Return the left;
//CH_ID is an implemented Set enumeration
                 
    }
Copy the code

Collect source code small exploration

1. Finisher () execution timing

public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
        A container;
        if(isParallel() && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT)) && (! isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) { container = collector.supplier().get(); BiConsumer<A, ?super P_OUT> accumulator = collector.accumulator();
            forEach(u -> accumulator.accept(container, u));
        }
        else {
        // Suppose we are serial
            container = evaluate(ReduceOps.makeRef(collector));
        }
        // Do you see what IDENTITY_FINISH does here? Return container if there is, finisher() if there is not
        return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
               ? (R) container
               : collector.finisher().apply(container);
    }
Copy the code

2, The execution time of other methods ->ReduceOps. MakeRef (Collector)

makeRef(Collector<? superT, I, ? > collector) {// Execute!
        Supplier<I> supplier = Objects.requireNonNull(collector).supplier();
        BiConsumer<I, ? super T> accumulator = collector.accumulator();
        BinaryOperator<I> combiner = collector.combiner();
        class ReducingSink extends Box<I>
                implements AccumulatingSink<T.I.ReducingSink> {
            @Override
            public void begin(long size) {
                state = supplier.get();
            }

            @Override
            public void accept(T t) {
                accumulator.accept(state, t);
            }

            @Override
            public void combine(ReducingSink other) { state = combiner.apply(state, other.state); }}return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) {
            @Override
            public ReducingSink makeSink(a) {
                return new ReducingSink();
            }

            @Override
            public int getOpFlags(a) {
            // UNORDERED
                return collector.characteristics().contains(Collector.Characteristics.UNORDERED)
                       ? StreamOpFlag.NOT_ORDERED
                       : 0; }}; }Copy the code

Eight:

After you are familiar with Collectors, all methods for viewing Collectors are very simple. Next time I’ll talk about group partitioning