RxJava2 use and source code reading

What is RxJava? According to RxJava’s description on GitHub: RxJava — Reactive Extensions for the JVM — a library for composing asynchronous and event-based programs using observable sequences for the Java

RxJava – a Java library that implements asynchronous operations based on observer mode and can be run on the JVM.

What RxJava does: It is the use of asynchronous RxJava that makes “logically complex code” extremely readable.

Rxjava making address

RxAndorid works with RxJava in Android. RxAndorid encapsulates AndroidSchedulers. MainThread (), Android developers use process, can easily post your tasks’ Android in the main thread, perform page updates.

RxAndroid making address

use

1, the observables

  • Observable: The observed
  • Observer: An Observer that receives data sent by an Observable

A, Rxjava thread switch:

//

Observable.create(new ObservableOnSubscribe<String>() {

    @Override

    public void subscribe(ObservableEmitter<String> e) throws Exception {

        //1. The asynchronous thread performs time-consuming operations

        //2, "execute completed" calls onNext to trigger a callback to notify the observer

        e.onNext("1");

        e.onComplete();

    }

}).subscribeOn(Schedulers.io())

        .observeOn(AndroidSchedulers.mainThread())

        .subscribe(new Observer<String>() {

            @Override

            public void onSubscribe(Disposable d) {

                // The subscribing moment is executed in the subscribing thread

            }



            @Override

            public void onNext(String value) {

                // Main thread execution method

            }



            @Override

            public void onError(Throwable e) {

                // Main thread execution method

            }



            @Override

            public void onComplete(a) {

                // Main thread execution method

            }

        });

Copy the code

B. Rxjava uses operators

Observable.create(new ObservableOnSubscribe<String>() {

    @Override

    public void subscribe(ObservableEmitter<String> e) throws Exception {

        / / IO thread

        // Request network data

        e.onNext("123456");

    }

}).map(new Function<String, Integer>() {

    @Override

    public Integer apply(String s) {

        / / IO thread

        // Network data parsing

        //

        // throw new RequestFailException(" failed to get network request ");

        return 123;

    }

}).doOnNext(new Consumer<Integer>() {    // Save the login result UserInfo

    @Override

    public void accept(@NonNull Integer bean) throws Exception {

        / / IO thread

        // Save network data



    }

}).subscribeOn(Schedulers.io())   / / IO thread

.observeOn(AndroidSchedulers.mainThread())  / / main thread

.subscribe(new Consumer<Integer>() {

    @Override

    public void accept(@NonNull Integer bean) throws Exception {

        / / update the UI

    }

}, new Consumer<Throwable>() {

    @Override

    public void accept(@NonNull Throwable throwable) throws Exception {

        // Error display error page

    }

});

Copy the code

2, Flowable

Flowable is produced in response to Backpressure. Flowable is an observed, used in conjunction with Subscriber

//

Flowable.create(new FlowableOnSubscribe<Integer>() {

    @Override

    public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {

        //1. The asynchronous thread performs time-consuming operations

        //2, "execute completed" calls onNext to trigger a callback to notify the observer

        emitter.onNext(0);

        emitter.onComplete();

    }

    / / if insufficient consumer spending power, then throw MissingBackpressureException anomalies

}, BackpressureStrategy.ERROR)

        .subscribeOn(Schedulers.io())

        .observeOn(AndroidSchedulers.mainThread())

        .subscribe(new Subscriber<Integer>() {

            @Override

            public void onSubscribe(Subscription s) {

                // Execute at subscribe time, occurs in "subscribe thread"

                // This method is used to request the number of events that the producer can consume

                // This indicates that the consumer has the spending power of long.max_value

                s.request(Long.MAX_VALUE);

            }



            @Override

            public void onNext(Integer integer) {

                // Main thread execution method

            }



            @Override

            public void onError(Throwable t) {

                // Main thread execution method

            }



            @Override

            public void onComplete(a) {

                // Main thread execution method

            }

        });

Copy the code

A. Backpressure

Backpressure is the problem caused by producers producing faster than consumers can consume.

There is a situation in RxJava where the observer sends messages so quickly that the observer cannot respond to them in time.

Such as:

Observable.create(new ObservableOnSubscribe<Integer>() {

    @Override

    public void subscribe(ObservableEmitter<Integer> e) throws Exception {

        // The producer in the asynchronous thread has unlimited production capacity

        while (true) {

            e.onNext(1);

        }

    }

})

.subscribeOn(Schedulers.io())

.observeOn(AndroidSchedulers.mainThread())

.subscribe(new Consumer<Integer>() {

    @Override

    public void accept(Integer integer) throws Exception {

        // In the "main thread", consumers' consumption capacity is insufficient, resulting in infinite accumulation of events, resulting in OOM

        Thread.sleep(2000);

        System.out.println(integer);

    }

});

Copy the code

Producers in asynchronous threads have unlimited production capacity; The consumption capacity of consumers in the main thread is insufficient, resulting in infinite accumulation of events and finally OOM.

There is a name for this phenomenon: Backpressure

B, the Subscription request (long n);

The Subscription. Request (long n) method is used to request the number of events a producer can consume.

  • When calling therequest(long n)Method, the producer sends the corresponding number of events for consumer consumption;
  • ifCall request is not displayedIt meansThe consumption power is 0.

In asynchronous invocation, RxJava has a cache pool to cache data that consumers cannot process temporarily cached. The default size of the cache pool is 128, meaning that only 128 events can be cached. Whether the number passed in request() is larger or smaller than 128, 128 events are initially stored in the cache pool; Of course, 128 events will not be stored if there are not so many events to send.

  • BackpressureStrategy.ERRORUnder the policy, if the producer produces more than 128 events, the cache pool overflows and throwsMissingBackpressureExceptionThe exception;
  • BackpressureStrategy.BUFFERStrategy: Replace the default cache pool of 128 events in RxJava with a larger cache pool so that the producer will produce events even if a large number is passed by the consumer through request(). However, this method consumes more memory, and OOM will not be generated unless we know more about consumers’ consumption ability and can grasp the specific situation. BUFFER should be used with caution.
  • BackpressureStrategy.DROPStrategy: Discard the event when the consumer can’t handle it. The consumer passes in its demand N via Request (), and the producer passes n events to the consumer for consumption. Anything else that can’t be consumed is thrown away.
  • BackpressureStrategy.LATESTPolicy: LATEST is consistent with the DROP function. The consumer passes in its demand N via Request (), and the producer passes n events to the consumer for consumption. Anything else that can’t be consumed is thrown away. The only difference is that LATEST always enables the consumer to receive the last event produced by the producer.

Source code reading – Simple example (1)

Note: the current source code version is RxJava :2.1.9

Start with a simple example that doesn’t involve operator and thread switching:

// Create an observer

Observer observer = new Observer<String>() {

    @Override

    public void onSubscribe(@NonNull Disposable d) {

        Log.d(TAG, "onSubscribe");

    }



    @Override

    public void onNext(String o) {



    }



    @Override

    public void onError(@NonNull Throwable e) {

        Log.d(TAG, "onError data is :" + e.toString());

    }



    @Override

    public void onComplete(a) {

        Log.d(TAG, "onComplete");

    }

};



// Create the observed

Observable observable = Observable.create(new ObservableOnSubscribe() {

    @Override

    public void subscribe(@NonNull ObservableEmitter e) throws Exception {

        e.onNext("hello");

        e.onNext("world");

        e.onComplete();

    }

});

/ / subscribe

observable.subscribe(observer);

Copy the code

A, ObservableOnSubscribe. Java

Look at the first ObservableOnSubscribe. Java class – this class

public interface ObservableOnSubscribe<T{

    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;

}

Copy the code

ObservableOnSubscribe is an ObservableEmitter class. ObservableOnSubscribe is an ObservableEmitter class. ObservableEmitter is an ObservableEmitter class.

ObservableEmitter.java

ObservableEmitter ObservableEmitter ObservableEmitter

public interface ObservableEmitter<Textends Emitter<T{



    void setDisposable(@Nullable Disposable d);



    void setCancellable(@Nullable Cancellable c);



    boolean isDisposed(a);



    @NonNull

    ObservableEmitter<T> serialize(a);



    @Experimental

    boolean tryOnError(@NonNull Throwable t);

}

Copy the code

ObservableEmitter is an extension of Emitter, and the extension method was introduced after RxJava2.0. With new capabilities such as being able to cancel in the middle of the cell, Emitter

public interface Emitter<T{



    void onNext(@NonNull T value);



    void onError(@NonNull Throwable error);



    void onComplete(a);

}

Copy the code

And these are three methods that you’re all familiar with. This corresponds to the following code:

new ObservableOnSubscribe() {

    @Override

    public void subscribe(@NonNull ObservableEmitter e) throws Exception {

        e.onNext("hello");

        e.onNext("world");

        e.onComplete();

    }

}

Copy the code

ObservableOnSubscribe

source (ObservableOnSubscribe

source)

B, observables. Create (ObservableOnSubscribe source)

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {

    ObjectHelper.requireNonNull(source, "source is null");

    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));

}

Copy the code
  • RxJavaPlugins ignore
  • We see incomingObservableOnSubscribeIs used to createObservableCreate, in fact,ObservableCreateisObservableAn implementation class of

ObservableOnSubscribe

source, ObservableOnSubscribe

source, ObservableOnSubscribe

source


//

ObservableCreate observableCreate = new ObservableCreate<T>(new ObservableOnSubscribe() {

    @Override

    public void subscribe(@NonNull ObservableEmitter e) throws Exception {

        // execute in the IO thread

        e.onNext("hello");

        e.onNext("world");

        e.onComplete();

    }

});

Copy the code
  • Here we know: whenObservableOnSubscribe.subscribeMethod is executed by the user through the callObservableEmitter.onNextMethod to send data out (to the observer)

Now let’s look at the ObservableCreate class

public final class ObservableCreate<Textends Observable<T{

    final ObservableOnSubscribe<T> source;



    public ObservableCreate(ObservableOnSubscribe<T> source) {

        this.source = source;

    }



    @Override

    protected void subscribeActual(Observer<? super T> observer) {

        CreateEmitter<T> parent = new CreateEmitter<T>(observer);

        observer.onSubscribe(parent);



        try {

            source.subscribe(parent);

        } catch (Throwable ex) {

            Exceptions.throwIfFatal(ex);

            parent.onError(ex);

        }

    }

    // omit some code...

}

Copy the code
  • ObservableOnSubscribe.subscribeMethod is inObservableCreate.subscribeActualIs executed in line 4 of the method;subscribeMethod, the user calls theObservableEmitter.onNextMethod to send the data out;
  • whilesubscribeActualThe second line of the method is calledobserver.onSubscribe(parent);Methods. When a subscription occurs, it is actively executed in the subscribing threadobservertheonSubscribeMethods;
  • CreateEmitterIs theObservableCreate.subscribeActual(Observer observer)Method passed inObserverEncapsulation;
  • CreateEmitterWhen a task is cancelled, it can no longer call back its encapsulated observer.observertheonNextMethods byCreateEmitter.onNextMethod call;

Observable.create(ObservableOnSubscribe

source); The ObservableCreate method finally returns an ObservableCreate object. Below the observables. Subscribe (observer); methods

C, observables. Subscribe (observer);

  • observable.subscribe(observer);The moment the subscription takes place.
  • hereobservable.subscribe(observer);The actual isObservableCreate.subscribe(observer);

Look at the Subscribe (Observer) method of an Observable

Observable.subscribe(Observer observer)

public final void subscribe(Observer<? super T> observer) {

    ObjectHelper.requireNonNull(observer, "observer is null");

    try {

        observer = RxJavaPlugins.onSubscribe(this, observer);

        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

        // Subscribe to Observable, which executes the subscribeActual method

        subscribeActual(observer);

    } catch (NullPointerException e) { // NOPMD

        throw e;

    } catch (Throwable e) {

        Exceptions.throwIfFatal(e);

        RxJavaPlugins.onError(e);

        //

        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");

        npe.initCause(e);

        throw npe;

    }

}

Copy the code
  • callobservable.subscribe(observer);Method is actually calledobservable.subscribeActual(observer)Methods.
  • observableObservableCreateA reference to, so the call here isObservableCreate.subscribeActual(observer)Methods.

Let’s go back to the subscribeActual method of the ObservableCreate class

ObservableCreate.java

public final class ObservableCreate<Textends Observable<T{

    final ObservableOnSubscribe<T> source;



    public ObservableCreate(ObservableOnSubscribe<T> source) {

        this.source = source;

    }

    // subscribeActual is called the moment the subscription occurs (observable.subscribe(observer); Called when

    @Override

    protected void subscribeActual(Observer<? super T> observer) {

        CreateEmitter terminates callback to Observer methods onNext, onError, etc

        CreateEmitter<T> parent = new CreateEmitter<T>(observer);

        // When a subscription occurs, the observer's onSubscribe(Disposable d) method is executed

        observer.onSubscribe(parent);



        try {

            source.subscribe(parent);

        } catch (Throwable ex) {

            Exceptions.throwIfFatal(ex);

            parent.onError(ex);

        }

    }

    // omit some code...

}

Copy the code
  • subscribeActualMethods in theThe subscription is invoked the moment it occurs; inobservable.subscribe(observer);Is called when;
  • observer.onSubscribe(parent);When a subscription occurs, the subscriber thread is called backobservertheonSubscribemethods
  • subscribeActualMethod, passed inObserverWill be packaged into aCreateEmitter; If the task is cancelled, passCreateEmitterCan be terminatedobserverIn the wayOnNext, onErrorEtc callback;

The second line of code observer.onsubscribe (parent);

observer.onSubscribe(parent); When a subscription occurs, the observer’s onSubscribe(Disposable D) method is executed, which returns to the following code

// Create an observer

Observer observer = new Observer<String>() {

    @Override

    public void onSubscribe(@NonNull Disposable d) {

        Log.d(TAG, "onSubscribe");

    }

    / /... Omit onNext, onError, onComplete

};

Copy the code
  • The argument passed here isnew CreateEmitter(observer)And it came trueDisposableInterface that does not call back the incoming observer if the task is canceledobserverThe correspondingOnNext, onError, onCompleteMethods such as

Source. Subscribe (parent);

source.subscribe(parent); Is ObservableOnSubscribe. Subscribe (new CreateEmitter < T > (the observer));

The code finally returns to the subscribe of ObservableOnSubscribe:

new ObservableOnSubscribe() {

    @Override

    public void subscribe(@NonNull ObservableEmitter e) throws Exception {

        e.onNext("hello");

        e.onNext("world");

        e.onComplete();

    }

}

Copy the code
  • insubscribeIs called toCreateEmitterOf the classOnNext, onComplete, onErrorMethod to send dataCreateEmitterIn theThe observer

At this point, the code trail for “this simple example that doesn’t involve operators and thread switching” ends.

Thread Switching (2)

Note: the current source code version is RxJava :2.1.9

Start with this simple example of a thread switch:

// Create an observer

Observer observer = new Observer<String>() {

    @Override

    public void onSubscribe(@NonNull Disposable d) {

        // The subscribing moment is executed in the subscribing thread

    }



    @Override

    public void onNext(String o) {

        // Execute in the main thread of Android

    }



    @Override

    public void onError(@NonNull Throwable e) {

        // Execute in the main thread of Android

    }



    @Override

    public void onComplete(a) {

        // Execute in the main thread of Android

    }

};



// Create the observed

Observable observable = Observable.create(new ObservableOnSubscribe() {

    @Override

    public void subscribe(@NonNull ObservableEmitter e) throws Exception {

        // execute in the IO thread

        e.onNext("hello");

        e.onNext("world");

        e.onComplete();

    }

});

// The observed IO thread

observable = observable.subscribeOn(Schedulers.io());

// Observe the main thread of Android

observable = observable.observeOn(AndroidSchedulers.mainThread());

/ / subscribe

observable.subscribe(observer);

Copy the code

Here’s my summary of the entire code execution flow for RxJava2:

Write the picture description here

A, observables. Create (ObservableOnSubscribe source)

ObservableOnSubscribe

source ObservableOnSubscribe

source ObservableOnSubscribe

source ObservableOnSubscribe

source



//

ObservableCreate observableCreate = new ObservableCreate<T>(new ObservableOnSubscribe() {

    @Override

    public void subscribe(@NonNull ObservableEmitter e) throws Exception {

        // execute in the IO thread

        e.onNext("hello");

        e.onNext("world");

        e.onComplete();

    }

});

Copy the code
  • ObservableCreateContains asubscribeActual(observer)Method to execute the incoming observerobserver.onSubscribeMethod, and indirectly calling the observerOnNext, onCompleteMethods;

ObservableCreate

public final class ObservableCreate<Textends Observable<T{

    final ObservableOnSubscribe<T> source;



    public ObservableCreate(ObservableOnSubscribe<T> source) {

        this.source = source;

    }



    @Override

    protected void subscribeActual(Observer<? super T> observer) {

        CreateEmitter<T> parent = new CreateEmitter<T>(observer);

        observer.onSubscribe(parent);



        try {

            source.subscribe(parent);

        } catch (Throwable ex) {

            Exceptions.throwIfFatal(ex);

            parent.onError(ex);

        }

    }

    // omit some code...

}

Copy the code
  • subscribeActualThe second line of the method calls the observer’sobserver.onSubscribe(parent);Methods; When a subscription occurs, it is actively executed in the subscribing threadobservertheonSubscribeMethods;
  • subscribeActualThe fourth line of the method calls the observer’sobserver.subscribeMethods;subscribeMethod, the user calls theCreateEmitter.onNextMethod to send the data out;
  • CreateEmitterIs theObservableCreate.subscribeActual(Observer observer)Method passed inObserverEncapsulation;
  • CreateEmitterWhen a task is cancelled, it can no longer call back its encapsulated observer.observertheonNextMethods byCreateEmitter.onNextMethod call;

Check the code associated with Observable. SubscribeOn (schedulers.io ()) below

ObservableEmitter is a reference to CreateEmitter, a further encapsulation of the ObservableEmitter Observer. CreateEmitter does not call back the Observer onNext method if the task is cancelled while executing onNext.

B, observables. SubscribeOn (Schedulers. IO ())

Look at the subscribeOn(Scheduler) method of the Observable class

Observable.java

public final Observable<T> subscribeOn(Scheduler scheduler) {

    ObjectHelper.requireNonNull(scheduler, "scheduler is null");

    // Generate an ObservableSubscribeOn object

    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));

}

Copy the code
  • Continue to ignoreRxJavaPlugins
  • And finally return aObservableSubscribeOnobject

Here observables observables. = observableCreate subscribeOn Schedulers. IO () () code is actually

ObservableSubscribeOn observable = new ObservableSubscribeOn<T>(observableCreate, Schedulers.io())

Copy the code
  • soobservable.subscribeOn(Schedulers.io())It returns oneObservableSubscribeOnA reference to the

Check out the ObservableSubscribeOn below

public final class ObservableSubscribeOn<Textends AbstractObservableWithUpstream<T.T{

    final Scheduler scheduler;



    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {

        super(source);

        this.scheduler = scheduler;

    }



    @Override

    public void subscribeActual(final Observer<? super T> s) {

        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);



        s.onSubscribe(parent);



        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));

    }

    / /... Omit some code

}

Copy the code

Take a look at the subscribeActual method in Observables Subscribeon

  • subscribeActualThe second line of the method executes the passObserveronSubscribeMethods;
  • subscribeActualMethod line 3: inschedulerThe correspondingIO thread,observableCreatethesubscribeMethod, passed in as the argumentSubscribeOnObserver, that is:IO threadperformobservableCreate.subscribe(new SubscribeOnObserver(observer));

As a result, Whatever ObservableSubscribeOn. SubscribeActual (observer) in which thread is called observableCreate. The subscribe (new SubscribeOnObserver

(observer)) are executed in the IO thread, so the observer’s e.onnext (“hello”); e.onComplete(); Also executes in the IO thread;

C, observables. ObserveOn (AndroidSchedulers mainThread ())

Scheduler Scheduler: Scheduler Scheduler: Scheduler: Scheduler: Scheduler: Scheduler: Scheduler: Scheduler: Scheduler: Scheduler: Scheduler: Scheduler: Scheduler

public final Observable<T> observeOn(Scheduler scheduler) {

    return observeOn(scheduler, false, bufferSize());

}

//

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {

    ObjectHelper.requireNonNull(scheduler, "scheduler is null");

    ObjectHelper.verifyPositive(bufferSize, "bufferSize");

    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));

}

Copy the code

Here you can see the observables observables = observableSubscribeOn. ObserveOn (AndroidSchedulers. MainThread ()) is actually:

ObservableObserveOn observable = new ObservableObserveOn<T>(observableSubscribeOn, AndroidSchedulers.mainThread(), false.128);

Copy the code

Therefore, observables. ObserveOn (AndroidSchedulers mainThread ()) returns the ObservableObserveOn references.

Look at ObservableObserveOn down here

public final class ObservableObserveOn<Textends AbstractObservableWithUpstream<T.T{

    final Scheduler scheduler;

    final boolean delayError;

    final int bufferSize;

    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {

        super(source);

        this.scheduler = scheduler;

        this.delayError = delayError;

        this.bufferSize = bufferSize;

    }



    @Override

    protected void subscribeActual(Observer<? super T> observer) {

        if (scheduler instanceof TrampolineScheduler) {

            source.subscribe(observer);

        } else {

            Scheduler.Worker w = scheduler.createWorker();



            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));

        }

    }

    / /... Omit some code

}

Copy the code

Take a look at the subscribeActual method in ObservableObserveOn

  • subscribeActualThe fifth line of the method is actuallyobservableSubscribeOn.subscribe(new ObserveOnObserver(observer, w, delayError, bufferSize));
  • ObserveOnObserverThe function of theObserveOnObservertheonNextWhen a method is implemented; willobservertheonNextMethods the post toAndroid is the main thread;

D, observables. Subscribe (observer)

  • We know thatObservablesubscribe(Observer observer)Method is actually calledObservablethesubscribeActual(Observer observer)Methods;
  • And here,observableThe actual isObservableObserveOnThe reference;

Therefore, observables. Subscribe (observer) is actually executed observableObserveOn. SubscribeActual (observer)

Here, our small example of thread switching (2) transforms into the following code:

// Create an observer

Observer observer = new Observer<String>() {

    @Override

    public void onSubscribe(@NonNull Disposable d) {

        // The subscribing moment is executed in the subscribing thread

    }



    @Override

    public void onNext(String o) {

        // Execute in the main thread of Android

    }



    @Override

    public void onError(@NonNull Throwable e) {

        // Execute in the main thread of Android

    }



    @Override

    public void onComplete(a) {

        // Execute in the main thread of Android

    }

};

//

ObservableCreate observableCreate = new ObservableCreate<T>(new ObservableOnSubscribe() {

    @Override

    public void subscribe(@NonNull ObservableEmitter e) throws Exception {

        // execute in the IO thread

        e.onNext("hello");

        e.onNext("world");

        e.onComplete();

    }

});

//

ObservableSubscribeOn observableSubscribeOn = new ObservableSubscribeOn<T>(observableCreate, Schedulers.io())

//

ObservableObserveOn observableObserveOn = new ObservableObserveOn<T>(observableSubscribeOn, AndroidSchedulers.mainThread(), false.128);

//

observableObserveOn.subscribeActual(observer);

Copy the code

Here we see observableObserveOn. SubscribeActual (observer)

ObservableObserveOn.java

public final class ObservableObserveOn<Textends AbstractObservableWithUpstream<T.T{

    final Scheduler scheduler;

    final boolean delayError;

    final int bufferSize;

    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {

        / / source for observableSubscribeOn

        super(source);

        / / scheduler for AndroidSchedulers. MainThread ()

        this.scheduler = scheduler;

        // false

        this.delayError = delayError;

        / / 128

        this.bufferSize = bufferSize;

    }



    @Override

    protected void subscribeActual(Observer<? super T> observer) {

        / / AndroidSchedulers mainThread HandlerScheduler (), so will go into the else part of the code

        if (scheduler instanceof TrampolineScheduler) {

            source.subscribe(observer);

        }

        // The code goes to the else section

         else {

            Scheduler.Worker w = scheduler.createWorker();

            / / source for observableSubscribeOn

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));

        }

    }

    / /... Omit some code

}

Copy the code
  • subscribeActualMethod,AndroidSchedulers.mainThread()forHandlerScheduler, soifIs ignored and goes directly to the code’selsePart.
  • subscribeActualMethod, the observerobserverEncapsulation isObserveOnObserver; And callobservableSubscribeOn.subscribe(new ObserveOnObserver(observer, w, delayError, bufferSize))
  • whileobservableSubscribeOn.subscribe(new ObserveOnObserver(observer, w, delayError, bufferSize))The actual is
ObserveOnObserver observeOnObserver = new ObserveOnObserver<T>(observer, w, delayError, bufferSize)

// 1, "subscribe thread" -- execute onSubscribe, actually execute observer onSubscribe method

observeOnObserver.onSubscribe(new SubscribeOnObserver<T>(observeOnObserver));

// 2, "I/O" -- execute subscribe; In the IO thread subscribe method, the user actively calls the onNext, onError, onComplete methods of ObserveOnObserver to send data

observableCreate.subscribe(new SubscribeOnObserver<T>(observeOnObserver))

Copy the code
  • The user callsSubscribeOnObservertheonNextIt’s sending the data out
  • SubscribeOnObserver.onNextCall theobserveOnObserver.onNext
  • observeOnObserver.onNextthroughHandlerSchedulerwillObserver. onNext, observer.onError, observer.onCompleteAnd other methods to execute in the Android main thread.

E. The overall flow chart is as follows

Finally, summarize the entire execution process of RxJava2:

Write the picture description here

reference

RxJava 2.0 (a) RxJava2 source code parsing (a) RxJava2 source code parsing — process