This article mainly refer to the Android learning notes RxJava: graphic transformation operators in this blog, the original blog illustrated, thank author sharing, can by clicking on the above blue font or click here to view the original blog content, this study notes in the pictures are from the original pictures of blog, thank you again for sharing of bloggers. This study note mainly records the map(),flatMap(),concatMap(),buffer() four transformation operators and the use of methods, and simply trace the map() method source.

Study notes environment:

Implementation 'IO. Reactivex. Rxjava2: rxjava: 2.2.21'Copy the code

Here is the text of the notes:

RxJava transform operator learning

An overview of the

The main functions of the transform operator are:

  • Process (transform) the events/whole event sequence in the event sequence to transform them into different events/whole event sequence

The specific schematic diagram is as follows:

Here is the picture from the original blogger’s article, thanks for the picture from the original blogger’s article, please click here to view the original blogger’s article, here is the original blogger’s brief book home page

map()

The map() operator transforms each event sent by an observer into an event of any type by processing it through a specified function. The map() operator is executed as follows:

Here is the picture from the original blogger’s article, thanks for the picture from the original blogger’s article, please click here to view the original blogger’s article, here is the original blogger’s brief book home page

The code is shown as follows:

        Observable.just(1.2.3.4.5)
            .map(object : Function<Int, String> {
                override fun apply(t: Int): String {
                    return "this is $t"
                }
            })
            .subscribe {
                Logs.e("TransformWithMap:$it")}Copy the code

In the above code, we first create an Observable via Just (), which successively emits events, and then process each emitted data through map(). Function

map(Function

) accepts a parameter of type Function

. This is an interface. We implemented this interface by creating an anonymous inner class and rewrote the apply(T): R method.
,r>
,r>
,r>

The 2021-03-26 14:16:37. 755, 24371-24371 / com. Project. Mystudyproject E/com. Project. Mystudyproject: TransformWithMap: This is 1 2021-03-26 14:16:37. 755, 24371-24371 / com project. Mystudyproject E/com project. Mystudyproject: TransformWithMap: This is the 2021-03-26 14:16:37 2. 756, 24371-24371 / com. Project. Mystudyproject E/com. Project. Mystudyproject: TransformWithMap: This is 3 the 2021-03-26 14:16:37. 756, 24371-24371 / com project. Mystudyproject E/com project. Mystudyproject: TransformWithMap: This is the 2021-03-26 14:16:37 4. 756, 24371-24371 / com. Project. Mystudyproject E/com. Project. Mystudyproject: TransformWithMap: This is 5Copy the code

As can be seen from the above result, we send each data with this is prefix.

ObservableOnSubscribe (ObservableOnSubscribe).subscribe(Observer); ObservableOnSubscribe (ObservableOnSubscribe); ObservableOnSubscribe (Observer); When we execute observable.just () we definitely get an Observable, so we’ll start tracing directly from map().

Map () method in Observable
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }
Copy the code

As you can see, rxJavaplugins.onAssembly (new ObservableMap

(this, mapper)) is the same code as rxJavaplugins.onAssembly (New ObservableMap

(this, mapper)). That is, with the map() method we get an ObservableMap object, as follows:
,>
,>

public final class ObservableMap<T.U> extends AbstractObservableWithUpstream<T.U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function; }... Other source code... }/ / in the superclass AbstractObservableWithUpStream related parameters and methods
    /** The source consumable Observable. */
    protected final ObservableSource<T> source;

    /**
     * Constructs the ObservableSource with the given consumable.
     * @param source the consumable Observable
     */
    AbstractObservableWithUpstream(ObservableSource<T> source) {
        this.source = source;
    }
Copy the code

In the above code, we save the Observable created by observable.just () and the Function we passed in.

After execution of the code above, then executes the subscribe (Observer) method, through the previous study, we also can understand, this method will execute to observables. SubscribeActual () method, Here is executed ObservableMap. SubscribeActual () method, the following is the source of methods:

/ / ObservableMap
    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }
Copy the code

The source.subscribe(MapObserver()) method is called, where source is the Observable returned by our first observable.just () method, Here is the specific type of the observables ObservableFromArray, then execute ObservableFromArray. The subscribe () in the end is still performed to ObservableFromArray. SubscribeActual () Method, the following is the source of this method:

/ / ObservableFromArray
    @Override
    public void subscribeActual(Observer<? super T> observer) {
        FromArrayDisposable<T> d = new FromArrayDisposable<T>(observer, array);

        observer.onSubscribe(d);

        if (d.fusionMode) {
            return;
        }

        d.run();
    }
Copy the code

As you can see, here will first create a FromArrayDisposable according to the observer, then the observer is in our ObservableMap. SubscribeActual () created in MapObserver, knowing this, Now enter the fromarrayCompounddisposable. Run () method:

void run(a) {
            T[] a = array;
            int n = a.length;

            for (int i = 0; i < n && ! isDisposed(); i++) { T value = a[i];if (value == null) {
                    downstream.onError(new NullPointerException("The element at index " + i + " is null"));
                    return;
                }
                downstream.onNext(value);
            }
            if (!isDisposed()) {
                downstream.onComplete();
            }
        }
Copy the code

As you can see, we’re going through the set of parameters that we passed in just one by one by calling downstream.onNext(value), This downstream is the MapObserver that we passed when we created FromArrayDisposable. Here is the onNext() method in MapObserver:

@Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if(sourceMode ! = NONE) { downstream.onNext(null);
                return;
            }

            U v;

            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            downstream.onNext(v);
        }
Copy the code

In this method, the sourceMode! = NONE, which is generally not true, then we call the function.apply () method we passed in, execute our custom operation on the current array loop through apply(), and finally emit the result.

The logic of this piece is also a bit convoluted, in a nutshell:

  1. We create an ObservableFromArray with observableObservable. Just (), which holds the parameters of just as an array, and then call the map() method. In this method, We create an ObservableMap, where we use two variables to hold the ObservableFromArray and Function we already had in the previous step.

  2. We then call the Subscribe (Observer) method of ObservableMap, in which we first create a MapObserver, which holds the Function we created above, We then call the SUBSCRIBE (MapObserver) method in the ObservableFromArray stored in ObservableMap and pass in the MapObserver created in the previous step.

  3. Then create the FromArrayDisposable(MapObserver,array) object in the Subscribe of ObservableFromArray (MapObserver()), The arguments we pass are the MapObserver we created and the array generated by the arguments in just(), and then we execute the Run () method in FromArrayDisposable, where we iterate through the array, each time we get the value of the array, The onNext() method in MapObserver is executed and the value is passed to onNext(), so that our arguments in just() are passed to onNext() in MapObserver.

  4. Finally, in the onNext() method of MapObserver, we take the values from the array as parameters in the function.apply (value) method, and then we get the final data by combining the apply() method that we overwrote when we created Function in the first step. We then call the Observer onNext() method we created in step 2 observableMap.subscribe (Observer) to send data to our Observer.

flatMap()

The function of this method is to split and transform the sequence of events sent by the observer, merge them into a new sequence of events, and finally send them, but flatMap() does not guarantee the sequence of events.

The principle of this method is as follows:

  • Create one for each event in the event sequenceObservableobject
  • Each new event converted from the original event is put into the correspondingObservableIn the
  • Will create eachObservableAll merged into one new, totalObservableobject
  • New, totalObservableSends the newly merged sequence of events to the observerObserver

Here is the picture from the original blogger’s article, thanks for the picture from the original blogger’s article, please click here to view the original blogger’s article, here is the original blogger’s brief book home page

The relevant code is shown as follows:

        Observable.create<Int> {
            it.onNext(1)
            it.onNext(2)
            it.onNext(3)
        }
            .flatMap(object : Function<Int, Observable<String>> {
                override fun apply(t: Int): Observable<String> {
                    val array = mutableListOf<String>()
                    for (i in 0 until 3) {
                        array.add("$t--flatMap-->  ${t * 10 + i}")}return Observable.fromIterable(array)
                }
            })
            .subscribe {
                Logs.e("transformWithFlatMap onNext: $it")}Copy the code

The code above runs as follows:

The 2021-03-26 15:45:51. 129, 26971-26971 / com. Project. Mystudyproject E/com. Project. Mystudyproject: transformWithFlatMap onNext: 1 - flatMap - - > 10 2021-03-26 15:45:51. 129, 26971-26971 / com. Project. Mystudyproject E/com project. Mystudyproject: transformWithFlatMap onNext: 1 - flatMap - - > 11 2021-03-26 15:45:51. 129, 26971-26971 / com. Project. Mystudyproject E/com project. Mystudyproject: transformWithFlatMap onNext: 1 -- flatMap - > 12 15:45:51. 2021-03-26, 129, 26971-26971 / com. Project. Mystudyproject E/com project. Mystudyproject: transformWithFlatMap onNext: 2 -- flatMap - > 20 15:45:51. 2021-03-26, 129, 26971-26971 / com. Project. Mystudyproject E/com project. Mystudyproject: transformWithFlatMap onNext: 2 -- flatMap - > 21 15:45:51. 2021-03-26, 129, 26971-26971 / com. Project. Mystudyproject E/com project. Mystudyproject: transformWithFlatMap onNext: 2 -- flatMap - > 22 15:45:51. 2021-03-26, 129, 26971-26971 / com. Project. Mystudyproject E/com project. Mystudyproject: transformWithFlatMap onNext: 3 -- flatMap - > 30 15:45:51. 2021-03-26, 129, 26971-26971 / com. Project. Mystudyproject E/com project. Mystudyproject: transformWithFlatMap onNext: 3 -- flatMap - > 31 15:45:51. 2021-03-26, 129, 26971-26971 / com. Project. Mystudyproject E/com project. Mystudyproject: transformWithFlatMap onNext: 3--flatMap--> 32Copy the code

There are several ways to create an Observable, and the resulting order is always in order, but the official documentation clearly states that the transformed elements may be interchangeable, so you should pay attention to the order.

concatMap()

This operator is similar to flatMap(), except that the sequence of events generated by splitting & remerging = the sequence of events produced by the old sequence being observed.

Here is the picture from the original blogger’s article, thanks for the picture from the original blogger’s article, please click here to view the original blogger’s article, here is the original blogger’s brief book home page

The relevant code is shown as follows:

        Observable.just(1.2.3)
            .concatMap(object : Function<Int, Observable<String>> {
                override fun apply(t: Int): Observable<String> {
                    val list = mutableListOf<String>()
                    for (i in t * 10 until t * 10 + 3) {
                        list.add("$t --concatMap--> $i")}return Observable.fromIterable(list)
                }
            }).subscribe {
                Logs.e("transformWithConcatMap onNext: $it")}Copy the code

The running results are as follows:

The 2021-03-26 16:15:10. 921, 27535-27535 / com. Project. Mystudyproject E/com. Project. Mystudyproject: transformWithConcatMap onNext: 1 - concatMap - - > 10 2021-03-26 16:15:10. 921, 27535-27535 / com. Project. Mystudyproject E/com project. Mystudyproject: transformWithConcatMap onNext: 1 - concatMap - - > 11 2021-03-26 16:15:10. 921, 27535-27535 / com. Project. Mystudyproject E/com project. Mystudyproject: transformWithConcatMap onNext: 1 -- concatMap - > 12 16:15:10. 2021-03-26, 921, 27535-27535 / com. Project. Mystudyproject E/com project. Mystudyproject: transformWithConcatMap onNext: 2 -- concatMap - > 20 16:15:10. 2021-03-26, 921, 27535-27535 / com. Project. Mystudyproject E/com project. Mystudyproject: transformWithConcatMap onNext: 2 -- concatMap - > 21 16:15:10. 2021-03-26, 921, 27535-27535 / com. Project. Mystudyproject E/com project. Mystudyproject: transformWithConcatMap onNext: 2 -- concatMap - > 22 16:15:10. 2021-03-26, 921, 27535-27535 / com. Project. Mystudyproject E/com project. Mystudyproject: transformWithConcatMap onNext: 3 -- concatMap - > 30 16:15:10. 2021-03-26, 921, 27535-27535 / com. Project. Mystudyproject E/com project. Mystudyproject: transformWithConcatMap onNext: 3 -- concatMap - > 31 16:15:10. 2021-03-26, 921, 27535-27535 / com. Project. Mystudyproject E/com project. Mystudyproject: transformWithConcatMap onNext: 3 --concatMap--> 32Copy the code

buffer()

This method periodically retrieves a certain number of events from the observed events & puts them in the cache and finally sends them.

Here is the picture from the original blogger’s article, thanks for the picture from the original blogger’s article, please click here to view the original blogger’s article, here is the original blogger’s brief book home page

The code is shown as follows:

        Observable.just(1.2.3.4.5)
            .buffer(3.2)
            .subscribe {
                Logs.e("transformWithBuffer onNext: $it")}Copy the code

Parameter Description:

  • The first parameter3Indicates that three data items are fetched at a time. If the number of data items is less than three, several data items are fetched
  • Second parameter2Represents the step size, that is, the first time from the first0The second time from the first position0 + 2Get the data from the third location0 + 2 + 2Get the data at the location of, and so on.

The result of this method is as follows:

The 2021-03-26 16:50:53. 794, 30119-30119 / com. Project. Mystudyproject E/com. Project. Mystudyproject: transformWithBuffer onNext: [1, 2, 3] 2021-03-26 16:50:53. 795, 30119-30119 / com. Project. Mystudyproject E/com project. Mystudyproject: transformWithBuffer onNext: 16:50:53 [3, 4, 5] 2021-03-26. 795, 30119-30119 / com. Project. Mystudyproject E/com project. Mystudyproject: transformWithBuffer onNext: [5]Copy the code

As you can see, the result is the same as described above.

Another way to use it

As you can see from the overloaded function of buffer(), there are other uses for buffer, as follows:

        Observable.intervalRange(10.100.0.1,TimeUnit.SECONDS)
            .buffer(20.2,TimeUnit.SECONDS)
            .subscribe{
                Logs.e("transformWithBuffer onNext: $it")}Copy the code

Parameter Description:

  • The first parameter20Said wait20sAnd then start to get data from the observed
  • Second parameter2Indicates subsequent intervals2sThe first20sData emitted by the observer
  • The third parameter is the unit of time

In the above code, we create an Observable from the intervalRange. This Observable emits data every 1s starting at 10, sets the buffer through buffer(), and prints the contents of the buffer.

The 2021-03-26 17:03:12. 562, 30811-30864 / com. Project. Mystudyproject E/com. Project. Mystudyproject: transformWithBuffer onNext: [10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30] 17:03:14 2021-03-26. 556. 30811-30864 / com project. Mystudyproject E/com. Project. Mystudyproject: transformWithBuffer onNext: [12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31] 17:03:16 2021-03-26. 556. 30811-30864 / com project. Mystudyproject E/com. Project. Mystudyproject: transformWithBuffer onNext: [14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33] 17:03:18 2021-03-26. 561. 30811-30864 / com project. Mystudyproject E/com. Project. Mystudyproject: transformWithBuffer onNext: [17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36] the 17:03:20. 2021-03-26, 560, 30811-30864 / com. Project. Mystudyproject E/com project. Mystudyproject: transformWithBuffer onNext: [18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38]Copy the code