This article is sponsored by Yugangshuo writing platform

Original author: April Grapes

Copyright: this article belongs to the wechat public account Yu Gang said all, without permission, shall not be reproduced in any form

1. Introduction

This article is mainly on the RxJava message subscription and thread switching source analysis, related to the use of the way is not introduced in detail.

This source code is based on RxJava :2.1.14.

2. RxJava profile

RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.

It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.

The above is from RxJava’s github introduction. Translated into Chinese, it roughly means:

RxJava is a responsive extension to the Java virtual machine, a library that combines asynchronous and event-based programs using observable sequences.

It extends the observer pattern to support data/event sequences and adds operators that allow you to declaratively combine sequences while abstracting concerns such as low-level threading, synchronization, thread safety and concurrent data structures.

Simply put, RxJava is an asynchronous library that uses the observer pattern.

3. Observer mode

As mentioned above, RxJava extends the observer mode, so what is the observation mode? Let’s look at it first.

For example, take wechat public account as an example, a wechat public account will constantly produce new content, if our readers are interested in the contents of the wechat public account, they will subscribe to the public account, when the public account has new content, it will be pushed to us. When we receive new content, if it’s something we’re interested in, we click on it. If it’s an advertisement, you might just ignore it. This is the typical observer pattern that we encounter in life.

In the above example, the wechat official account is an Observable that constantly generates content (event), and we readers are an Observer. By subscribing, we can receive the content (event) pushed by the wechat official account (Observer). Make different actions according to different content (events).

3.1 Rxjava Role Description

There are four roles in RxJava’s extended Observer pattern:

role Role functions
Observed (Observable) Generate events
Observer (Observer) Respond to events and process them
Events (Event) The observed and the message carrier of the observer
Subscribe (Subscribe) Connect the observed to the observer

3.2 RxJava event types

Events in RxJava fall into three types: Next events, Complete events, and Error events. The details are as follows:

The event type meaning instructions
Next Regular event The observer can send an infinite number of Next events, and the observer can receive an infinite number of Next events
Complete The end of the event After the observer sends the Complete event, the observer can continue to send events. After receiving the Complete event, the observer will not accept any other events
Error Abnormal events After the Error event is sent by the observer, the sending of other events is stopped. After the Error event is received by the observer, no other events are accepted

4.RxJava message subscription

Before looking at the mechanics of RxJava message subscription, let’s take a look at the simple steps to use it. Here for the convenience of explanation, do not use the chain code to illustrate, but a step-by-step way to explain one by one (usually write code or recommend to use chain code to call, because it is more concise). Its use steps are as follows:

  1. Create observed (Observable), which defines the event to send.
  2. Create observer (Observer), accepts the event and responds.
  3. By subscribing to (subscribe) the observer connects them together.

4.1 RxJava message subscription example

Here we follow the steps above to implement this example, as follows:

        // Step 1. Create an Observable and define an event to send.
        Observable observable = Observable.create(
        new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter)
            throws Exception {
                emitter.onNext(Article 1 "");
                emitter.onNext(Article 2 "");
                emitter.onNext("Article 3"); emitter.onComplete(); }});// Step 2. Create an Observer that accepts events and responds to them.
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
            }

            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext : " + s);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError : " + e.toString());
            }

            @Override
            public void onComplete(a) {
                Log.d(TAG, "onComplete"); }};// Step 3. The observer connects them by subscribing to the observed.
        observable.subscribe(observer);
Copy the code

The output is as follows:

OnSubscribe onNext: Article1OnNext:2OnNext:3
onComplete
Copy the code

4.2 Source code Analysis

Let’s take a look at the source code for the message subscription process in two parts: the creation observed process and the subscription process.

4.2.1 Creating an Observed process

Let’s start by looking at the process of creating an Observable. In the example above, we created an Observable directly using Observable.create(). Let’s click on this method to see how it works.

4.2.1.1 Create () of Observable
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
Copy the code

As you can see, the create() method does nothing but create an ObservableCreate object and pass our own ObservableOnSubscribe parameter to the ObservableCreate. Finally, call the rxJavaplugins.onAssembly () method.

Let’s start with the ObservableCreate class:

4.2.1.2 ObservableCreate class
public final class ObservableCreate<T> extends Observable<T> {// Inherit from Observable
    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;// Assign the ObservableOnSubscribe object we created to source.}}Copy the code

As you can see, ObservableCreate inherits from Observable and stores ObservableOnSubscribe objects.

Take a look at the rxJavaplugins.onAssembly () method

4.2.1.3 onAssembly() of RxJavaPlugins
    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        // Omit irrelevant code
        return source;
    }
Copy the code

Simply return the ObservableCreate created above.

4.2.1.4 Simple summary

So observable.create () repackages our custom ObservableOnSubscribe object into an ObservableCreate object and returns the ObservableCreate object. Note that this use of rewrapping new objects is frequently used in RxJava and will be encountered many times in the analysis that follows. Put a picture to understand, wrap it up

4.2.1.5 sequence diagram

The sequence diagram for Observable.create() looks like this:

4.2.2 Subscription Process

Now let’s take a look at the code for the subscribe process. Again, go to Observable.subscribe() :

4.2.2.1 Subscribe () of Observable
    public final void subscribe(Observer<? super T> observer) {
            // Omit irrelevant code
           
            observer = RxJavaPlugins.onSubscribe(this, observer);

            subscribeActual(observer);
           
            // Omit irrelevant code
    }
Copy the code

As you can see, the core code is actually just two sentences, let’s take a look at them separately:

4.2.2.2 onSubscribe() of RxJavaPlugins
    public static <T> Observer<? super T> onSubscribe(
    @NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
        // Omit irrelevant code
       
        return observer;
    }
Copy the code

As in the previous code, the original observer is returned. Take a look at the subscribeActual() method.

4.2.2.3 subscribeActual() of the Observable class
    protected abstract void subscribeActual(Observer<? super T> observer);
Copy the code

The method in the subscribeActual() of the Observable class is an abstract method, so where is the actual implementation? Remember when we created the observed, it eventually returns an ObservableCreate object, which is a subclass of Observable. Let’s click on it and see:

4.2.2.4 ObservableCreate Class subscribeActual()
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        // Triggers our custom Observer's onSubscribe(Disposable) method
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch(Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); }}Copy the code

As you can see, the subscribeActual() method first creates a CreateEmitter object and passes in our custom observer observer as an argument. Again, wrap it up and put a picture:

CreateEmitter
ObservableEmitter
Disposable

    static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {
        // Omit the code
    }
Copy the code

This is followed by a call to observer.onsubscribe (parent), which is actually a call to the observer’s onSubscribe() method, which tells the observer that it has successfully subscribed to the observed.

Continuing on, the subscribeActual() method continues to call source.subscribe(parent), where source is the ObservableOnSubscribe object, That is, the subscribe() method of ObservableOnSubscribe is called here. We specifically define subscribe() as follows:

        Observable observable = Observable.create(
        new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter)
            throws Exception {
                emitter.onNext(Article 1 "");
                emitter.onNext(Article 2 "");
                emitter.onNext("Article 3"); emitter.onComplete(); }});Copy the code

An ObservableEmitter is an ObservableEmitter. So, the three onNext() methods and one onComplete() in subscribe() are called one by one. The ObservableEmitter interface is implemented as CreateEmitter. Let’s look at the onNext() and onComplete() implementations of the CreateEmitte class:

4.2.2.5 CreateEmitter class onNext(), onComplete(), etc
        // Omit the other code
       
        @Override
        public void onNext(T t) {
            // Omit irrelevant code
            if(! isDisposed()) {// Call the observer onNext()observer.onNext(t); }}@Override
        public void onComplete(a) {
            if(! isDisposed()) {try {
                    // Call the observer's onComplete()
                    observer.onComplete();
                } finally{ dispose(); }}}Copy the code

As you can see, the end result is the onNext() and onComplete() methods that are called to the observer. At this point, a complete message subscription process is complete. In addition, you can see that there is an isDisposed() method that can control the direction of the message, i.e. cut off the message delivery, which I’ll talk about later.

4.2.2.6 Summary

After the connection is established between the Observable and the Observer, a CreateEmitter is created. The emitter sends events generated by the Observed to the Observer, and the Observer responds to the events emitted by the Emitter. As you can see, it is only after the subscription that the Observable starts sending events.

Here is a diagram of the flow of events:

4.2.2.7 Timing Flow chart

Let’s take a look at the timing flow of the subscription process:

4.3 Cutting a Message

Cutting messages has been mentioned before, so let’s see how to use it:

4.3.1 Cutting the message

        Observable observable = Observable.create(
        new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter)
            throws Exception {
                emitter.onNext(Article 1 "");
                emitter.onNext(Article 2 "");
                emitter.onNext("Article 3"); emitter.onComplete(); }}); Observer<String> observer =new Observer<String>() {
            private Disposable mDisposable;
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe : " + d);
                mDisposable=d;
            }

            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext : " + s);
                mDisposable.dispose();
                Log.d(TAG, "Disconnect the observer from the observed.");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError : " + e.toString());
            }

            @Override
            public void onComplete(a) {
                Log.d(TAG, "onComplete"); }}; observable.subscribe(observer);Copy the code

The output is:

onSubscribe : nullOnNext:1Disconnect the observer from the observedCopy the code

As you can see, to shut off the message passing is as simple as calling the Dispose () method on Disposable. After dispose() is called, the observer can continue to send the message, but the observer can not receive the message. The Disposable value of the onSubscribe output is “null”, not null.

4.3.2 Cut off message source analysis

Let’s look at the implementation of Dispose () here. Disposable is an interface that understands that Disposable is a connector that will be broken after dispose() is called. This is implemented in the CreateEmitter class, which was mentioned earlier. Let’s look at the Dispose () method of CreateEmitter:

4.3.2.1 CreateEmitter dispose ()
        @Override
        public void dispose(a) {
            DisposableHelper.dispose(this);
        }
Copy the code

Just call DisposableHelper.Dispose (this).

4.3.2.2 DisposableHelper class
public enum DisposableHelper implements Disposable {

    DISPOSED
    ;
   
    // Omit other code

    public static boolean isDisposed(Disposable d) {
        // Determine if the reference to a variable of type Disposable is equal to DISPOSED
        // That is, determine whether the connector is interrupted
        return d == DISPOSED;
    }
   
    public static boolean dispose(AtomicReference<Disposable> field) {
        Disposable current = field.get();
        Disposable d = DISPOSED;
        if(current ! = d) {// Here we will set field to DISPOSED
            current = field.getAndSet(d);
            if(current ! = d) {if(current ! =null) {
                    current.dispose();
                }
                return true; }}return false; }}Copy the code

You can see DisposableHelper is an enumerated class and has only one value, DISPOSED. An atomic reference field is DISPOSED in the Dispose () method, that is, marked as interrupted. Therefore, the isDisposed() method enables you to determine if your connector is interrupted.

4.3.2.3 Methods in CreateEmitter class

Back to the method in the CreateEmitter class:

        @Override
        public void onNext(T t) {
            // Omit irrelevant code
           
            if(! isDisposed()) {// onNext() is called only if it is not disposed ()observer.onNext(t); }}@Override
        public void onError(Throwable t) {
            if(! tryOnError(t)) {// If Dispose () is called here, it will eventually crashRxJavaPlugins.onError(t); }}@Override
        public boolean tryOnError(Throwable t) {
            // Omit irrelevant code
            if(! isDisposed()) {try {
                    // onError() is called only if it is not disposed ()
                    observer.onError(t);
                } finally {
                    // Dispose () after onError()
                    dispose();
                }
                // Return true if dispose() is not used
                return true;
            }
            // If dispose() is disposed, return false
            return false;
        }

        @Override
        public void onComplete(a) {
            if(! isDisposed()) {try {
                    // onComplete() is called only if dispose() is not used
                    observer.onComplete();
                } finally {
                    // Dispose () after onComplete()dispose(); }}}Copy the code

As you can see from the above code:

  1. If there is nodispose.observer.onNext()Will be called to.
  2. onError()andonComplete()Mutually exclusive. Only one of them can be called, because either one of them will be calleddispose().
  3. First,onError()afteronComplete().onComplete()Will not be called to. Conversely, it will crash becauseonError()Throws an exception in:RxJavaPlugins.onError(t). Is, in fact,disposeAnd then continue to callonError()Will be Fried.

5.RxJava thread switching

The above example and analysis are performed in the same thread, and there is no thread switching involved. However, in practice, we usually need to do some data fetching in a child thread, and then update the UI in the main thread, which involves thread switching. With RxJava, we can also write thread switching neatly.

5.1 Thread Switching Example

I won’t go into detail here about how RxJava uses thread switching. Let’s look directly at an example and print out the threads in which RxJava is running for each role.

        new Thread() {
            @Override
            public void run(a) {
                Log.d(TAG, "Thread run() is running on :" + Thread.currentThread().getName());
                Observable
                        .create(new ObservableOnSubscribe<String>() {
                            @Override
                            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                                Log.d(TAG, "Observable Subscribe () on thread :" + Thread.currentThread().getName());
                                emitter.onNext(Article 1 "");
                                emitter.onNext(Article 2 "");
                                emitter.onComplete();
                            }
                        })
                        .subscribeOn(Schedulers.io())
                        .observeOn(AndroidSchedulers.mainThread())
                        .subscribe(new Observer<String>() {
                            @Override
                            public void onSubscribe(Disposable d) {
                                Log.d(TAG, "Observer onSubscribe() thread :" + Thread.currentThread().getName());
                            }

                            @Override
                            public void onNext(String s) {
                                Log.d(TAG, "Observer onNext() on thread :" + Thread.currentThread().getName());
                            }

                            @Override
                            public void onError(Throwable e) {
                                Log.d(TAG, "Observer onError() thread :" + Thread.currentThread().getName());
                            }

                            @Override
                            public void onComplete(a) {
                                Log.d(TAG, "Observer onComplete() on thread :"+ Thread.currentThread().getName()); }}); } }.start();Copy the code

The output is:

Thread run(a)The Thread is thread-2 ObserveronSubscribe(a)The Thread is thread-2 Observablesubscribe(a)The thread is RxCachedThreadScheduler-1 ObserveronNext(a)The thread is main ObserveronNext(a)The thread is main ObserveronComplete(a)The thread is mainCopy the code

As you can see from the above example:

  1. ObserverOf the observeronSubscribe()The method runs in the current thread.
  2. ObservableOf the observedsubscribe()Running on thesubscribeOn()In the specified thread.
  3. ObserverOf the observeronNext()andonComplete()Wait for the method to run inobserveOn()In the specified thread.

5.2 Source code Analysis

Let’s analyze the source code for thread switching, which is divided into two parts: subscribeOn() and observeOn().

5.2.1 subscribeOn() source code analysis

First look at the subscribeOn() used in our example:

    .subscribeOn(Schedulers.io())
Copy the code

The subscribeOn() method is passed as an argument an object of the Scheduler class, which is a scheduling class that can delay or periodically execute a task.

5.2.1.1 Scheduler type

The Schedulers class allows us to obtain the subclasses of various Schedulers. RxJava provides the following thread scheduling classes for us to use:

The Scheduler type use meaning Usage scenarios
IoScheduler Schedulers.io() IO operation thread I/O intensive operations such as reading and writing SD card files, querying databases, and accessing networks
NewThreadScheduler Schedulers.newThread() Creating a new thread Time-consuming operation, etc.
SingleScheduler Schedulers.single() Singleton thread When only one singleton thread is needed
ComputationScheduler Schedulers.computation() The CPU computes the thread of operation Image compression sampling, XML, JSON parsing and other CPU intensive computing
TrampolineScheduler Schedulers.trampoline() The current thread When a task needs to be executed immediately on the current thread
HandlerScheduler AndroidSchedulers.mainThread() Android is the main thread Update the UI etc.
5.2.1.2 Schedulers类的io()

Schedulers.io() ¶ Let’s take a look at the schedulers.io () code. The other Scheduler subclasses are similar


    @NonNull
    static final Scheduler IO;
   
    @NonNull
    public static Scheduler io(a) {
        //1. Return a Scheduler object named IO
        return RxJavaPlugins.onIoScheduler(IO);
    }
   
    static {
        // Omit irrelevant code
       
        //2.IO objects are instantiated in static code blocks, where an IOTask() is created.
        IO = RxJavaPlugins.initIoScheduler(new IOTask());
    }
   
    static final class IOTask implements Callable<Scheduler> {
        @Override
        public Scheduler call(a) throws Exception {
            //3.IOTask returns an IoHolder object
            returnIoHolder.DEFAULT; }}static final class IoHolder {
        //4.IoHolder will be a new IoScheduler object
        static final Scheduler DEFAULT = new IoScheduler();
    }
Copy the code

As you can see, schedulers.io () uses a static inner class to create a singleton IoScheduler object that inherits from the Scheduler. This IoScheduler will be used later.

5.2.1.3 subscribeOn() of Observable

Then, let’s look at the subscribeOn() code:

    public final Observable<T> subscribeOn(Scheduler scheduler) {
        // Omit irrelevant code
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
Copy the code

As you can see, the current Observable (which is implemented as ObservableCreate) is first wrapped up as a new ObservableSubscribeOn object. Put a photo:

As before, rxJavaplugins.onAssembly () simply returns the ObservableSubscribeOn object as is, so I won’t look at it here. Take a look at the construction of ObservableSubscribeOn:

The constructor of the ObservableSubscribeOn class
    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }
Copy the code

I’m just going to save the source and the scheduler, which I’ll use later.

Then the subscribeOn() method is finished. It doesn’t seem to be doing anything, just rewrapping the object and returning the new object. To wrap an old observed into a new observed.

5.2.1.5 ObservableSubscribeOn class subscribeActual()

Now let’s go back to the subscription process. Why do we go back to the subscription process? Because the event is sent from the subscription process ah. Although we are using thread switching here, the subscription process is the same as the previous section, so we will not repeat it here and start at a different place. Remember that the subscribeActual() method of the Observable in the subscription process is abstract? So look at the implementation of the subclasses. In the subscription process in the previous section, the concrete implementation was in the ObservableCreate class. But since we called subscribeOn(), the ObservableCreate object is wrapped as a new ObservableSubscribeOn object. So let’s look at the subscribeActual() method in the ObservableSubscribeOn class:

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
Copy the code

The subscribeActual() also wraps our custom Observer into a new SubscribeOnObserver object. Again, here’s a picture:

Observer
onSubscribe()
Observer
onSubscribe()
SubscribeTask
scheduler.scheduleDirect()
SubscribeTask

5.2.1.6 SubscribeTask class
    //SubscribeTask is the inner class of ObservableSubscribeOn
    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run(a) {
            // In this case, the source is the ObservableCreatesource.subscribe(parent); }}Copy the code

A simple class that implements the Runnable interface, then calls observer.subscribe () in run().

5.2.1.7 scheduleDirect() of Scheduler

Consider the scheduler.scheduleDirect() method

    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }
Copy the code

Look down:

    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {

        //createWorker() is an abstract method in the Scheduler class, so its implementation is in its subclass
        // So the createWorker() should be implemented in the IoScheduler.
        // Runnable can be executed in Worker
        final Worker w = createWorker();
       
        // The actual decoratedRun is still the run object, which is the SubscribeTask
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
       
        // Wrap the Runnable and Worker into a DisposeTask
        DisposeTask task = new DisposeTask(decoratedRun, w);
       
        //Worker executes the task
        w.schedule(task, delay, unit);

        return task;
    }
Copy the code

Let’s look at the process of creating the Worker and the Worker performing the task.

5.2.1.8 IoScheduler createWorker() and Schedule ()
    final AtomicReference<CachedWorkerPool> pool;
   
    public Worker createWorker(a) {
        // Create a new EventLoopWorker and pass a Worker cache pool to it
        return new EventLoopWorker(pool.get());
    }
   
    static final class EventLoopWorker extends Scheduler.Worker {
        private final CompositeDisposable tasks;
        private final CachedWorkerPool pool;
        private final ThreadWorker threadWorker;

        final AtomicBoolean once = new AtomicBoolean();
       
        // The constructor
        EventLoopWorker(CachedWorkerPool pool) {
            this.pool = pool;
            this.tasks = new CompositeDisposable();
            // Retrieve a Worker from the cache Worker pool
            this.threadWorker = pool.get();
        }

        @NonNull
        @Override
        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            // Omit irrelevant code
           
            // The Runnable is handed over to the threadWorker
            returnthreadWorker.scheduleActual(action, delayTime, unit, tasks); }}Copy the code

Note that different Scheduler classes have different Worker implementations because the Scheduler class is ultimately delivered to the Worker to perform scheduling.

Let’s look at the Worker cache pool in action:

5.2.1.9 CachedWorkerPool the get ()
    static final class CachedWorkerPool implements Runnable {
        ThreadWorker get(a) {
            if (allWorkers.isDisposed()) {
                return SHUTDOWN_THREAD_WORKER;
            }
            while(! expiringWorkerQueue.isEmpty()) {// If the buffer pool is not empty, the threadWorker is fetched from the buffer pool
                ThreadWorker threadWorker = expiringWorkerQueue.poll();
                if(threadWorker ! =null) {
                    returnthreadWorker; }}// If the buffer pool is empty, create one and return it.
            ThreadWorker w = new ThreadWorker(threadFactory);
            allWorkers.add(w);
            returnw; }}Copy the code
5.2.1.10 NewThreadWorker scheduleActual ()

We’ll look at threadWorker. ScheduleActual (). The ThreadWorker class does not implement scheduleActual(), but its parent class NewThreadWorker does.

public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    private final ScheduledExecutorService executor;

    volatile boolean disposed;

    public NewThreadWorker(ThreadFactory threadFactory) {
        // The constructor creates a ScheduledExecutorService object that uses thread pools
        executor = SchedulerPoolFactory.create(threadFactory);
    }
   
    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        // The decoratedRun here is actually a run object
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        // Wrap the decoratedRun as a new object ScheduledRunnable
        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

        // Omit irrelevant code
       
        if (delayTime <= 0) {
            // The thread pool immediately executes ScheduledRunnable
            f = executor.submit((Callable<Object>)sr);
        } else {
            // ScheduledRunnable is delayed in the thread pool
            f = executor.schedule((Callable<Object>)sr, delayTime, unit);
        }
           
        // Omit irrelevant code

        returnsr; }}Copy the code

An executor uses a pool of threads to execute tasks, and the run() method of the SubscribeTask is executed in the pool, while the subscribe() method of the Observable is called in the IO thread. This is consistent with the output in the example above:

Observable subscribe(a)The thread is RxCachedThreadScheduler-1Copy the code
5.2.1.11 Summary
  1. ObserverOf the observeronSubscribe()The method runs in the current thread, because thread switching has not been involved before.
  2. If I set itSubscribeOn (specifies the thread), thenObservableIn (the observed)subscribe()The method will run on the specified thread.
5.2.1.12 sequence diagram

Take the overall subscribeOn() switch thread timing diagram

5.2.1.13 subscribeOn() Is Set Multiple times

If we set the subscribeOn() multiple times, which thread of execution is it? Let’s do an example

        // Omit the code before and after, look at the key parts
        .subscribeOn(Schedulers.io())/ / for the first time
        .subscribeOn(Schedulers.newThread())/ / the second time
        .subscribeOn(AndroidSchedulers.mainThread())/ / the third time
Copy the code

The output is as follows:

Observable subscribe(a)The thread is RxCachedThreadScheduler-1Copy the code

That is, only the first subscribeOn() is active. Why is that? As we know, every time a call to subscribeOn() is made, the subscribeOn() wraps the old observer into a new observer, and after three calls, it looks like this:

ObservableSubscribeOn
subscribeOn(Schedulers.io())
subscribeOn()

5.2.2 observeOn ()

Let’s look at observeOn() again, again to review the Settings in our example:

    // Specify execution in the Main Android thread
    .observeOn(AndroidSchedulers.mainThread())
Copy the code
5.2.2.1 Observable observeOn()
    public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }

    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        // Omit irrelevant code
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }
Copy the code

Note that the old ObservableSubscribeOn object is wrapped, and the subscribeOn() call is wrapped one layer further, so now it looks like this:

Rxjavaplugins.onassembly () is also returned as is.

Let’s look at the constructor for ObservableObserveOn.

Constructor for the ObservableObserveOn class
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }
Copy the code

It’s just a bunch of variable assignments.

5.2.2.3 ObservableObserveOn subscribeActual ()

Much like subscribeOn(), let’s look directly at the subscribeActual() method of ObservableObserveOn.

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        // Check whether the thread is current
        if (scheduler instanceof TrampolineScheduler) {
            // If the current thread calls the subscribe() method directly
            // Call the subscribe() method of ObservableSubscribeOn
            source.subscribe(observer);
        } else {
            / / create the Worker
            / / in this case the scheduler to AndroidSchedulers mainThread ()
            Scheduler.Worker w = scheduler.createWorker();
            // This wraps the Worker into an ObserveOnObserver object
            }}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}
            // In this example, source-subscribe is executed in the IO thread.
            source.subscribe(newObserveOnObserver<T>(observer, w, delayError, bufferSize)); }}Copy the code

Again, the observer is wrapped in a layer, as shown in the figure below:

Source.subscribe () will send events one by one. Let’s just look at the onNext() method in the ObserveOnObserver, not onComplete(), etc.

5.2.2.4 ObserveOnObserver onNext ()
        @Override
        public void onNext(T t) {
            // Omit irrelevant code
            if(sourceMode ! = QueueDisposable.ASYNC) {// Store the information in the queue
                queue.offer(t);
            }
            schedule();
        }
Copy the code

It’s just calling schedule().

5.2.2.5 ObserveOnObserver schedule ()
        void schedule(a) {
            if (getAndIncrement() == 0) {
                //ObserveOnObserver also implements the Runnable interface, so it's up to the worker to schedule it
                worker.schedule(this); }}Copy the code

The main thread scheduler in Android will not be analyzed. It is actually implemented using a handler to send messages. Since ObserveOnObserver implements the Runnable interface, its run() method is called in the main thread. Let’s look at the run() method of ObserveOnObserver:

5.2.2.6 ObserveOnObserver run ()
        @Override
        public void run(a) {
            //outputFused is false by default
            if (outputFused) {
                drainFused();
            } else{ drainNormal(); }}Copy the code

Here’s the drainNormal() method.

5.2.2.7 ObserveOnObserver drainNormal ()
        void drainNormal(a) {
            int missed = 1;
            // Queue to store messages
            final SimpleQueue<T> q = queue;
            // The actual is actually a SubscribeOnObserver
            final Observer<? super T> a = actual;

            // Omit irrelevant code
           
            // Fetch the message from the queue
            v = q.poll();
           
            / /...
           
            // The onNext() method is called inside the layer
            / / in this case, is called SubscribeOnObserver. OnNext ()
            a.onNext(v);
           
            / /...
        }
Copy the code

As for SubscribeOnObserver. OnNext (), also didn’t switch threads inside logic, is invoked inside a layer of onNext (), so you will call to our custom in the Observer onNext () method. Thus, the onNext() method of the Observer is called in the thread specified in observeOn(), in this case, in the main Android thread.

5.2.2.8 Brief summary
  1. If I set itObserveOn (specify thread), thenObserver(observer)onNext(),onComplete()And so on will run in the specified thread.
  2. subscribeOn()The set thread will not be affectedobserveOn().
5.2.2.9 sequence diagram

Finally, a sequence of observeOn() :

6. Other

Due to my limited level, if there is any mistake, welcome to point out and communicate ~ April grape’s blog

Welcome to pay attention to the wechat official account to receive first-hand technical dry goods