preface

We have already seen how subscribeOn switches upstream threads to specified child threads in RxJava in the previous article. Here’s how observeOn switches downstream threads to the specified thread.

RxJava – subscribeOn

Here you can briefly review the subscribeOn principle using UML diagrams.

By subscribeOn we do the following:

  • Creates an ObservableSubscribeOn object, which is essentially an Observable, He also realized the AbstractObservableWithUpstream (HasUpstreamObservableSource) such an interface that is he changed a Observeable with upstream.
  • In the subscribeActual method of Observables Subscribeon
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
Copy the code

The real subscribe operation is placed in a Runnable called SubscribeTask that is initiated by the scheduler, thus placing the upstream operation in the thread where the Scheduler resides.

  • Schedulers.newthread () or schedulers.io () create a thread of a specified type using a factory method pattern. When this particular thread executes, the actual subscribe method is executed. This puts the upstream operations into a specific thread for execution.

RxJava – observeOn

After a brief review of subscribeOn, let’s look at how observeOn works.

In fact, after understanding the subscribeOn principle, it is much simpler to look at observeOn. There are many similarities in the naming and implementation ideas of classes, which can be understood by comparison.

RxJava code is written very clever, can be said to be never tired of reading, can learn a lot of places. To avoid the nightmare of not seeing the wood for the trees, let’s explore the mysteries of observeOn with the following questions.

  1. Handler is used to pass messages between threads in Android. Is it used here? How is it used?
  2. AndroidSchedulers. MainThread () do?
  3. How downstream tasks are guaranteed to be assigned to a given thread.

The sample


    private void multiThread(a) {
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("This msg from work thread :" + Thread.currentThread().getName());
                sb.append("\nsubscribe: currentThreadName==" + Thread.currentThread().getName());
            }
        })
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.e(TAG, "accept: s= "+ s); }}); }Copy the code

Let’s use this code as an example to see how observeOn works. Here by observeOn (AndroidSchedulers mainThread ()) to the downstream thread switches to we are very familiar with the Android UI thread. This ensures that everything we do downstream is done in the UI thread. As with subscribeOn, let’s start with this code and see what happens behind the scenes.

With the previous experience, we know AndroidSchedulers. MainThread () must have to create some type of scheduler, in order to facilitate the back of the narrative, this time we start with the scheduler to create, to look behind observeOn () implementation.

Note that AndroidSchedulers is not a part of RxJava, is in order to facilitate the use of RxJava in Android and specifically designed a scheduler implementation, source codeRxAndroidThe design is very clever; Remember to configure dependencies in the Gradle file before using it.

AndroidSchedulers.mainThread()

Take a look at below AndroidSchedulers. MainThread () this we are very familiar with the Scheduler is how to create.

public final class AndroidSchedulers {

    private static final class MainHolder {

        static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
    }

    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
            new Callable<Scheduler>() {
                @Override public Scheduler call(a) throws Exception {
                    returnMainHolder.DEFAULT; }});public static Scheduler mainThread(a) {
        returnRxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD); }}Copy the code

. Here we can think, when calling AndroidSchedulers mainThread (), returns a HandlerScheduler instance, and use this instance to the Handler we are very familiar with. This brings us to the HandlerScheduler.

final class HandlerScheduler extends Scheduler {
    private final Handler handler;

    HandlerScheduler(Handler handler) {
        this.handler = handler;
    }

    @Override
    public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
        if (run == null) throw new NullPointerException("run == null");
        if (unit == null) throw new NullPointerException("unit == null");

        run = RxJavaPlugins.onSchedule(run);
        ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
        handler.postDelayed(scheduled, Math.max(0L, unit.toMillis(delay)));
        return scheduled;
    }

    @Override
    public Worker createWorker(a) {
        return new HandlerWorker(handler);
    }

    private static final class HandlerWorker extends Worker {
        private final Handler handler;

        private volatile boolean disposed;

        HandlerWorker(Handler handler) {
            this.handler = handler;
        }

        @Override
        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            if (run == null) throw new NullPointerException("run == null");
            if (unit == null) throw new NullPointerException("unit == null");

            if (disposed) {
                return Disposables.disposed();
            }

            run = RxJavaPlugins.onSchedule(run);

            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

            Message message = Message.obtain(handler, scheduled);
            message.obj = this; // Used as token for batch disposal of this worker's runnables.

            handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));

            // Re-check disposed state for removing in case we were racing a call to dispose().
            if (disposed) {
                handler.removeCallbacks(scheduled);
                return Disposables.disposed();
            }

            return scheduled;
        }

        @Override
        public void dispose(a) {
            disposed = true;
            handler.removeCallbacksAndMessages(this /* token */);
        }

        @Override
        public boolean isDisposed(a) {
            returndisposed; }}private static final class ScheduledRunnable implements Runnable.Disposable {
        private final Handler handler;
        private final Runnable delegate;

        private volatile boolean disposed;

        ScheduledRunnable(Handler handler, Runnable delegate) {
            this.handler = handler;
            this.delegate = delegate;
        }

        @Override
        public void run(a) {
            try {
                delegate.run();
            } catch (Throwable t) {
                IllegalStateException ie =
                    new IllegalStateException("Fatal Exception thrown on Scheduler.", t); RxJavaPlugins.onError(ie); Thread thread = Thread.currentThread(); thread.getUncaughtExceptionHandler().uncaughtException(thread, ie); }}@Override
        public void dispose(a) {
            disposed = true;
            handler.removeCallbacks(this);
        }

        @Override
        public boolean isDisposed(a) {
            returndisposed; }}}Copy the code

This class is very simple, but very cleverly designed.

  • First, the HandlerScheduler is a Scheduler that uses the constructor to get the Handler instance of the main thread. In his createWorker() method, he uses this Handler instance to create an instance of the HandlerWorker, which is essentially a Worker. In his schedule method, he creates a ScheduleRunnable object and sends the Runnable object through the sendMessageDelayed method of the handler, which we know is the main thread. Thus, downstream, the task is transferred from a child thread to the UI thread.

  • ScheduleRunnable implements not only Runnable, but also Disposable, which we have seen countless times.

        @Override
        public void run(a) {
            try {
                delegate.run();
            } catch (Throwable t) {

            }
        }

        @Override
        public void dispose(a) {
            disposed = true;
            handler.removeCallbacks(this);
        }
Copy the code

This way, the run method correctly executes the task in the thread, and once the Dispose () method is executed on the Disposable, Dispose. RemoveCallbacks (this) ensures that no operations are being done in handler’s dispatchMessage method

observeOn

Let’s look at the observeOn method in Observable

Observable.java — observeOn


    public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }

    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }
Copy the code

The implementation of this method is very similar to the subscribeOn implementation, with two additional parameters delayError and BufferSize. Buffersize can be thought of as a static variable inside RxJava; by default it is 128. Based on our previous experience, we can simplify the observeOn process as follows:

new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize)
Copy the code

That is, the observeOn operator returns us an ObservableObserveOn object. It is easy to imagine that he is also an Observeable. So let’s go and see what the ObservableObserveOn is. We are most concerned about how to implement the subscribeActual method.

ObservableObserveOn

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T.T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(newObserveOnObserver<T>(observer, w, delayError, bufferSize)); }}}Copy the code

Like ObservableSubscribeOn, he also inherited AbstractObservableWithUpstream, so he is also a Observeable with upstream, the constructor is very simple, nothing to say. Here we focus on the implementation of subscribeActual method. Here we use the Scheduler instance is AndroidSchedulers mainThread (), so as the else logic analysis.


            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
Copy the code

The Worker object is created by scheduler.createWorker(). Here before the combination of AndroidSchedulers. MainThread () analysis, the worker object here is is a main thread holding a handler reference to the worker.

We then create an ObserveOnObserver object with the worker. Look at the implementation of this class.

    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {... }Copy the code

This class is very powerful, first and foremost an Observer, but also a Runnable, and inherits BasicIntQueueDisposable(atomicity, operation queue, and Disposable).

source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
Copy the code

We focus on this line of code, here is the source of its parent class according to the previous (AbstractObservableWithUpstream) the member variables, that is, the upstream, then the current ObservableObserveOn upstream is who? Observables Subscribeon, as we said in our last article.

So, when the subscribe() method is executed here, it responds in the following order:

Observable.subscribe—>Observable.subscribeActual—> ObservableObserveOn.subscribeActual—> ObservableSubscribeOn.subscribeActual—>ObservableCreate.subscribeActual

The parameters of these methods are all observer, and the final subscribeActual(Observer<? Super T> observer), this observer holds references to previous observers.

We look at the ObservableCreate subscribeActual

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch(Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); }}Copy the code

Here you can see, first will trigger the observer. The onSubscribe, we’ll have a look at ObservableSubscribeOn. SubscribeActual

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
Copy the code

Okay, so we’re back to square one:

source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
Copy the code

Back to the original Observer: ObserveOnObserver

This ObserveOnObserver holds the observer we originally created, which is a Consumer object.

Let’s take a look at the ObserveOnObserver

  • The constructor
        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.actual = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }
Copy the code

An actual is an observer

  • onSubscribe
@Override
        public void onSubscribe(Disposable s) {
            if (DisposableHelper.validate(this.s, s)) {
                this.s = s;
				// At this stage, the Disposable we use is a single one, so we will not discuss it for the time being
				// For QueueDisposable

                queue = new SpscLinkedArrayQueue<T>(bufferSize);

                actual.onSubscribe(this); }}Copy the code

In ObservableCreate subscribeActual we know, when executing the subscribe method, can implement the observer onSubscribe method first. The implementation here is very simple, creating a queue and triggering the Observer’s own onSubscribe method.

  • onNext
        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if(sourceMode ! = QueueDisposable.ASYNC) { queue.offer(t); } schedule(); }Copy the code

The Scheule () method is executed in onNext.

        void schedule(a) {
            if (getAndIncrement() == 0) {
                worker.schedule(this); }}Copy the code

As mentioned above, the worker is a worker object holding the main thread handler. When the schedule is executed, the specific thread tasks are transferred to the main thread for execution through handler. PostDelay method.

So what is this here? As mentioned earlier, ObserveOnObserver is a very powerful class. It is a Runnable, so it is executing its own run method.

        @Override
        public void run(a) {
            if (outputFused) {
                drainFused();
            } else{ drainNormal(); }}Copy the code

There is an outputFused parameter which is false by default, but when it is true is not the focus of the discussion here.

void drainNormal(a) {
            int missed = 1;

            final SimpleQueue<T> q = queue;
            final Observer<? super T> a = actual;

            for (;;) {
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }

                for (;;) {
                    boolean d = done;
                    T v;

                    try {
                        v = q.poll();
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        s.dispose();
                        q.clear();
                        a.onError(ex);
                        worker.dispose();
                        return;
                    }
                    boolean empty = v == null;

                    if (checkTerminated(d, empty, a)) {
                        return;
                    }

                    if (empty) {
                        break;
                    }

                    a.onNext(v);
                }

                missed = addAndGet(-missed);
                if (missed == 0) {
                    break; }}}Copy the code

This is basically an endless loop where events are fetched from the queue created in the onSubscribe method and the Observer onNext method is executed. When the example is empty, worker. Dispose is executed to cancel the entire event flow and remove all messages from the Handler.

And finally take a look at onComplete,onError is similar to the whole thing

        @Override
        public void onComplete(a) {
            if (done) {
                return;
            }
            done = true;
            schedule();
        }
Copy the code

As you can see, the processing here is also simple, with done set to true. This completes the downstream event.

The last

Well, for some inexplicable reason, the next part of the RxJava thread switch has been completed after a long time.