RRxjava thread switching principle source code analysis

preface

Rxjava is a very popular reactive asynchronous framework that allows you to play around with threads in a series of chained calls, avoiding the dreaded callback hell.

So how does this awesome framework work?

Actually in many technical platform, has about rxjava source code parsing, but I see a lot of, still feel most speak more obscure, most of the analysis of time too deep source, didn’t see the forest for the trees, finish see understand a little bit, but the but again feel what is wrong, can not have a clear understanding of rxjava. Therefore, I intend to try to shield rXJava implementation details as much as possible, and instead try to show the implementation process of RXJava, so that readers can have a general understanding of the implementation principle of RXJava, and then combined with other technical blogs and RXJava source code, to understand the purpose of RXJava thread switching principle.

This article is intended for readers who are comfortable with RXJava, but for those who have never used it, please refer to Link 1 below and come back.

Rxjava call flow

First, let’s look at a simple implementation of RXJava:

        Observable<Integer> integerObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onComplete();
            }
        }).subscribeOn(Schedulers.io())
          .observeOn(AndroidSchedulers.mainThread());

        Observer<Integer> observer = new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {}@Override
            public void onNext(Integer integer) {}@Override
            public void onError(Throwable e) {}@Override
            public void onComplete(a) {}}; integerObservable.subscribe(observer);Copy the code

The above code can be broken down into several stages:

  1. Observable.create() returns an ObservableCreate object
  2. SubscribeOn () returns an ObservableSubscribeOn object
  3. ObserveOn () returns an ObservableObserveOn object
  4. integerObservable.subscribe(observer)

The previous procedures 1~3 are the process of generating observed observables. As you can see from the source code, this process is the process of packaging ObservableCreate layer by layer and generating a new Observable. In the process of packaging, the wrapped Observable is stored as a member variable:

Ok, now that I’ve explained this, I think you understand how these key objects are created and how they are referenced. And then after procedure 4 calls, rXJava chain calls the car is going to start, so get in the car

I’ll show you a sequence diagram, so it’ll be a little bit clearer

The ObservableObserveOn ObservableObserveOn object is created by wrapping it, as explained in procedure 1 and 2 above. The wrapping is not a fancy mooncake wrapper, but an internal thread switch for the ObservableObserveOn observable. The secret to this thread switch lies in the subscribeActual() method

Next, I will explain process 3 to 9 step by step

The process of 3

First, the source code for procedure 3 looks like this

What an trampoline cheduler is, I won’t go into detail here, but I’m going to assume that the method takes a second statement, source-subscribe (). Remember, the member variable source of ObservableObserveOn is actually ObservableSubscribeOn, so we wrapped the observer layer and then we went to procedure 4 (note, Subscribe () calls the internal method subscribeActual(), unless otherwise specified).

The process of 4

As you can see in Procedure 4, the observer will be wrapped as a runnable and the thread will be switched via the scheduler that subscribeOn(schedulers.io ()) receives during the build process. Just keep in mind that there’s a thread switch going on, and we’ll talk about how that happens later, so we don’t lose track of the main body for the sake of detail.

Next, we execute SubscrbeTask on the IO thread. Let’s look at the definition of this task

You can see that in the run() method, the source.subscribe() method is called again, remember? The source of the ObservableSubscribeOn member is the ObservableCreate, which means that we have come back to the ObservableCreate object and, surprisingly, we have implemented the thread switch. Subscribe () is executed in the IO thread.

The process of 5 ~ 7

The source of the ObservableCreate member variable is ObservableOnSubscribe, so source.subscribe() will be executed here:

It then internally calls e.onnext (), which calls CreateEmitter’s onNext() method

The obsever. OnNext () method is called internally. What is this observer? In fact, careful readers should notice that in the above process, every time subscribeActual() method is called, the original observer is wrapped one layer and then transmitted continuously. Therefore, we can also describe the packaging process similar to that of observable in this picture. The structure of the Observer:

Therefore, it can be seen that CreateEmitter’s onNext() method is actually the onNext() method that executes the SubscribeOnObserver

Process of 8 ~ 9

As you can see, onNext() calls the onNext() method of the member variable actual inside the SubscribeOnObserver. So what does onNext() do in the ObserveOnObserver

Does that make sense? ObserveOnObserver is a class that implements the Runnable interface, and its onNext() method adds itself to the worker. The worker nature is set up by our initial observeOn (AndroidSchedulers. MainThread ()), which is the main thread! That is, ultimately the drainNormal() is called in the main thread and the actual. OnNext () method is called inside it; this actual is the innermost observer

At this point, rXJava thread switching is finally finished, we will be relatively clear? If you don’t already know, you can debug it again through breakpoints and it will be much clearer.

Next, let’s talk about the specific process of thread switching, which was not mentioned above

Details of thread switching

Let’s take the child thread of the above example, namely IoScheduler, as an example to explain the details of thread switching. The implementation principle of switching to the main thread is similar, and RXJava switches to the main thread through handler. Readers can refer to the details by themselves.

Ok, let’s go back to process 4

The scheduler.scheduleDirect() method is called in this step, internally

CreateWorker () is an abstract method. In our case, ioScheduler.createWorker () is called and returns an EventLoopWorker object that maintains a thread pool with a core number of 1. Finally, the thread is called internally through worker.schedule() to perform this task, thus achieving thread switch. This thread pool creation process is quite deep and involves multiple classes, so I will not post them one by one. You can follow it from the following picture:

Extra knowledge

IoSchedule maintains a CacheWorkPool object that caches worker objects to avoid repeated creation of thread pools. Ioschedule.release () is called at the end of the chain call, observer.oncomplete (). The cacheWorkPool has a thread running every 60 seconds to check if the cached worker has expired and remove it from the list, thereby reusing and releasing the thread pool.

Question of time
  1. What happens when you call subscribeOn() multiple times and set different Schedules, and subscribe() is executed on what thread?

    Something like the following:

The answer is:

Each call subscribeOn () will generate a ObservableSubscribeOn object, then the order of the next upgrade call ObservableSubscribeOn. SubscribeActual (), until to the IO thread, Since the observable object of the package is observableCreate, the next flow is the execution process of processes 5~9 mentioned above. That is, subscribeOn() is executed multiple times, but subscribe() is finally executed in the first subscribeOn() specified thread, in this case the IO thread.

  1. What if the flatMap() operator was inserted on top of this?

The answer is that flatMap() is still executed in the IO thread

Why is that? Let’s dive into the source code

After first calling the flatMap() method, an internal series of overloads will eventually generate the ObservableFlatMap object

As you can see, the process is similar in that the last Observable is wrapped in a layer. Then let’s look at the subscribeActual() and onNext() methods of ObservableFlatMap

It can be seen that subscribeActual() does not perform conversion, and the real conversion is done in onNext() method. Then we can know from the conclusion of Question 1 that onNext() is run in IO thread. Therefore, The flatMap() operator is still executed on the onNext() thread.

Little practice

In which thread do flatMap() and SUBSCRIBE () execute?

Observable.just(1)
                .subscribeOn(Schedulers.computation())
                .observeOn(Schedulers.single())
                .subscribeOn(Schedulers.io())
                .flatMap(new Function<Integer, ObservableSource<Integer>>() {
                    @Override
                    public ObservableSource<Integer> apply(Integer integer) throws Exception {
                        Log.d(TAG, "flatMapThread = " + Thread.currentThread().getName());
                        return Observable.just(integer);
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .observeOn(Schedulers.newThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "subscribeThread = "+ Thread.currentThread().getName()); }});Copy the code

You can think about it, and then run to see the results and what you want to be right ~

Code word is not easy, if you readers master after reading feel helpful to you, then give me a little encouragement ~


Reference links:

  1. RxJava for Android developers
  2. Explain RxJava’s message subscription and thread switching principles