First, start with simple Single, analyze and summarize step by step

Here is an example of a simple task to create the &Map operator based on RxJava3

        Single.just(1)
                .subscribeOn(Schedulers.io())
                .map(object:Function<Int,String>{
                    override fun apply(t: Int?): String {
                        Thread.sleep(2000)
                        return t.toString()
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(object:SingleObserver<String>{
                    override fun onSuccess(t: String?) {}override fun onSubscribe(d: Disposable?) {}override fun onError(e: Throwable?) {}})Copy the code

The I/O thread returns the number as a String and receives the String value on the main UI thread. I’m not going to simplify lambda for the sake of reading it.

Second, why is this code so “streaming” in the first place?

All four methods from just to observeOn return Single, which is a little bit like Builder mode. What if there’s another way to do it? Yes, such as using hell callback nesting.

It should be mentioned here that Rxjava is a wrapped library, or threading tool, that takes the idea of responsive programming (using a producer/consumer model) and provides a good API call experience.

3. How to switch threads

Because we have analyzed and understood, we will not record the detour. Directly from the bottom to read the source code, and the most straightforward and understandable logical description, after all, can not say the knowledge is “false knowledge”.

  • 1 Subscribe method, which passes in a SingleObserver to receive callback notifications.

You’ll see that the subscribeActual method, which is also Single, is returned by each of these operations.

    public final void subscribe(@NonNull SingleObserver<? super T> observer) {...try{ subscribeActual(observer); . }}Copy the code
  • 2 Then look at the observeOn method of the method that creates this observer. Note that the mainThread scheduler is passed in. This is important to remember.
    public final Single<T> observeOn(@NonNull Scheduler scheduler) {
        Objects.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new SingleObserveOn<>(this, scheduler));
    }

Copy the code

You can see that the SingleObserveOn is created and returned, and this and scheduler are passed in. I’m going to hit the blackboard again. Who is this? The scheduler is who? This is just calling this method object, which is the return object from the previous method, which is the map. Scheduler, mainThread. Let’s see what a mainThread is,

public final class AndroidSchedulers {

    private static final class MainHolder {
        static final Scheduler DEFAULT
            = new HandlerScheduler(new Handler(Looper.getMainLooper()), true);
    }

    private static final Scheduler MAIN_THREAD =
        RxAndroidPlugins.initMainThreadScheduler(() -> MainHolder.DEFAULT);

    public static Scheduler mainThread(a) {
        returnRxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD); }...Copy the code

We created a wrapped HandlerScheduler and passed it in to a Handler built by MainLooper. We don’t have to wait until later to see, because he has not yet executed here as long as we know what type he is, convenient to find him later.

  • 3 Back to the previous execution position, continue to look at the object that observeOn() accepts this and scheduler

The SingleObserveOn concern is of course the subscribeActual method, which, as analyzed above, is executed for every Single subscription. SingleObserveOn SingleObserveOn method

public final class SingleObserveOn<T> extends Single<T> {

    final SingleSource<T> source;

    final Scheduler scheduler;

    public SingleObserveOn(SingleSource<T> source, Scheduler scheduler) {
        this.source = source;
        this.scheduler = scheduler;
    }

    @Override
    protected void subscribeActual(final SingleObserver<? super T> observer) {
        source.subscribe(newObserveOnSingleObserver<>(observer, scheduler)); }... To omitCopy the code

The key here is that the source is the upstream Single object, which is the “this” in front of the blackboard. You can see that an internal observer has been created and subscribed to the upstream to receive upstream callbacks. And who is the SingleObserver that is received by the subscribeActual in the SingleObserveOn? Subscribe (object:SingleObserver{} is the ultimate observer that developers focus on. You can see that the downstream observer is being passed to the internal observer.

  • 4 Since a new observer is created internally and subscribed to upstream, this internal observer must be executed after the upstream callback. That is, it executes its own logic after receiving the parameters passed upstream. So here we can not look at the upstream, first to see how he is back to his downstream. It’s so convoluted. It’s a lot of swimming

! Long code warning!

static final class ObserveOnSingleObserver<T> extends AtomicReference<Disposable>
    implements SingleObserver<T>, Disposable.Runnable {
        private static final long serialVersionUID = 3528003840217436037L;

        final SingleObserver<? super T> downstream;

        final Scheduler scheduler;

        T value;
        Throwable error;

        ObserveOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler) {
            this.downstream = actual;
            this.scheduler = scheduler;
        }

        @Override
        public void onSubscribe(Disposable d) {
            if (DisposableHelper.setOnce(this, d)) {
                downstream.onSubscribe(this); }}@Override
        public void onSuccess(T value) {
            this.value = value;
            Disposable d = scheduler.scheduleDirect(this);
            DisposableHelper.replace(this, d);
        }

        @Override
        public void onError(Throwable e) {
            this.error = e;
            Disposable d = scheduler.scheduleDirect(this);
            DisposableHelper.replace(this, d);
        }

        @Override
        public void run(a) {
            Throwable ex = error;
            if(ex ! =null) {
                downstream.onError(ex);
            } else{ downstream.onSuccess(value); }}... }Copy the code

Rewrite the blackboard, while the logic is still in observeOn oh!! ObserveOnSingleObserver Takes two construction parameters, actual downstream observers and assigns values to downstream. Scheduler The external class scheduler, which is mainThread, which is the HandlerScheduler. ObserveOnSingleObserver implements the Runnable interface, calling onError, onSuccess from the run method of the downstream observer object. Let’s see if this run method is executed by that guy. Since this inner class takes over the upstream callback, passed downstream when the run method executes, surely something needs to be done during the upstream callback

        @Override
        public void onSuccess(T value) {
            this.value = value;
            Disposable d = scheduler.scheduleDirect(this);
            DisposableHelper.replace(this, d);
        }

        @Override
        public void onError(Throwable e) {
            this.error = e;
            Disposable d = scheduler.scheduleDirect(this);
            DisposableHelper.replace(this, d);
        }
Copy the code

Scheduler. ScheduleDirect (this);

    public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {... ScheduledRunnable scheduled =new ScheduledRunnable(handler, run);
        Message message = Message.obtain(handler, scheduled);
        if (async) {
            message.setAsynchronous(true);
        }
        handler.sendMessageDelayed(message, unit.toMillis(delay));
        return scheduled;
    }
Copy the code

To make the code easier to read, create an internal ScheduledRunnable object wrapped around an external Runnable object, and call the external run method inside the run method. This message is sent using a handler built with mainLooper. So far, the last two methods are analyzed in the sample code.

Conclusion: The thread specified in ObserveOn determines the next subscriber receive thread, and the thread switch is completed internally by passing the upstream callback parameter received by the main thread handler to the downstream. That is, ObserveOn affects downstream receiving threads.

Student: So the observer in the end has to be right next to the ObserveOn, right? As far as the analysis goes, it’s not that simple.

Let’s put that aside for a moment and move on to the map

  • 5 Internal and upstream relationships of the MAP operator

The map method internally creates a SingleMap, which takes an upstream this and a function object called mapper. It makes sense. Mapper is just a function created externally, implementing apply.

public final class SingleMap<T.R> extends Single<R> {
    final SingleSource<? extends T> source;

    final Function<? super T, ? extends R> mapper;

    public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) {
        this.source = source;
        this.mapper = mapper;
    }

    @Override
    protected void subscribeActual(final SingleObserver<? super R> t) {
        source.subscribe(newMapSingleObserver<T, R>(t, mapper)); }...Copy the code

Similar to the previous one, you create an internal observer subscription to the upstream in the subscribeActual method and save the downstream observer and mapper functions. Focus on its ouSuccess method

        @Override
        public void onSuccess(T value) {
            R v;
            try {
                v = Objects.requireNonNull(mapper.apply(value), "The mapper function returned a null value."); . t.onSuccess(v); }Copy the code

This is clear: receive the upstream value, call mapper.apply for transformation, and call the downstream observer’s onSuccess pass. This is why the onSuccess type subscribed after the Map operator needs to be the same as the return value type of the apply method. In addition, there is no thread switch, meaning that the upstream callback is in the same thread as the current one. Summary: In this example, the operator’s thread of execution depends on the thread specified upstream of it.

  • 6 Finally, look at the subscribeOn method

Single. Just (1) has no need to record, because it is very simple to create the mode, subscribeActual

Passes the value 1 downstream to onSuccess.

SubscribeOn internally creates the SingleSubscribeOn object and accepts the upstream this, and the IO Scheduler. And then you have the experience of subscribeActual and you go to the subscribeActual

    @Override
    protected void subscribeActual(final SingleObserver<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer, source);
        observer.onSubscribe(parent);

        Disposable f = scheduler.scheduleDirect(parent);

        parent.task.replace(f);

    }
    
    
Copy the code

ScheduleDirect. But this is the implementation of the IO scheduler. SubscribeOnObserver also implements the Runnable interface and subscribes itself upstream in the run method, which is passed directly downstream in onSuccess.

    static final class SubscribeOnObserver<T>
    extends AtomicReference<Disposable>
    implements SingleObserver<T>, Disposable.Runnable {

        final SingleObserver<? super T> downstream;

        final SequentialDisposable task;

        final SingleSource<? extends T> source;

        SubscribeOnObserver(SingleObserver<? super T> actual, SingleSource<? extends T> source) {
            this.downstream = actual;
            this.source = source;
            this.task = new SequentialDisposable();
        }

        @Override
        public void onSuccess(T value) {
            downstream.onSuccess(value);
        }

        @Override
        public void run(a) {
            source.subscribe(this); }... Omit}Copy the code

Who is upstream? It does not specify a thread, but subscribing to it means that it is the thread of execution, which means that it is the downstream onSuccess receiving thread. That’s the thread where the run method executes.

  • The scheduleDirect method of the IO Scheduler

There is no scheduleDirect method, so let’s go up to the Scheduler

    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }
Copy the code

Well, that’s easy. Keep going!

    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        DisposeTask task = new DisposeTask(decoratedRun, w);

        w.schedule(task, delay, unit);

        return task;
    }
Copy the code

At the end, W.schedule executes the task. Don’t forget that createWorker and Schedule are IoScheduler methods. Turn around and see

    @NonNull
    @Override
    public Worker createWorker(a) {
        return new EventLoopWorker(pool.get());
    }
    static final class EventLoopWorker extends Scheduler.Worker {... omitprivate final ThreadWorker threadWorker;

        final AtomicBoolean once = new AtomicBoolean();

        EventLoopWorker(CachedWorkerPool pool) {
            this.pool = pool;
            this.tasks = new CompositeDisposable();
            this.threadWorker = pool.get();
        }
        @NonNull
        @Override
        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }

            returnthreadWorker.scheduleActual(action, delayTime, unit, tasks); }}Copy the code

Final call to threadWorker scheduleActual, continue to chase! Go straight to NewThreadWorker without keeping you guessing

    @NonNull
    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {... omittry {
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else{ f = executor.schedule((Callable<Object>)sr, delayTime, unit); }...return sr;
    }
Copy the code

Ah executor. Submit, executor. The schedule well ah this is not a Java thread pool API. There is no need to post code here, but it is a pool with one core and 5-10 non-core threads. At this point, the upstream task is executed in the background thread, and the downstream logical analysis is also received in the background thread by default.

Summary: After this observeOn, the analysis with subscribeOn is finished, as can be seen from the source code

  • An observeOn is a thread that switches the notification to the downstream after receiving the upstream notification. It determines the downstream thread that receives the notification. If there are multiple downstream threads, it can switch the downstream thread that gets the notification several times
  • SubscribeOn is to subscribe after switching to a thread, that is, to specify the firing thread, that is, to specify the upstream execution thread. Since it is from the subscription upstream on the lower child, the thread specified by the first subscribeOn will be switched to when subscribeOn is used multiple times. This will give the illusion that only the first setting takes effect, but in fact each layer of thread switching will be performed, which is wasteful and useless.

Comfortable ~~ ugh. There is a problem not clear above, comfortable what ah. It’s a quarter past one. Take a break. We’ll pick it up tomorrow.

Every other day supplement -!

4. Whose scheduled thread does the observer thread that is finally subscribed belong to?

Where is the receiving thread of the Observer set by calling subscribe(Observer)? What if the thread specified by ObserverOn, which is closest to him, and the thread specified by subscribeOn, which is not available? Primitive thread. Test code based on map operator

        Single.just(1)
                .observeOn(Schedulers.io())
                .subscribeOn(AndroidSchedulers.mainThread())
                .map(object:Function<Int,Int>{
                    override fun apply(t: Int): Int {
                        println(${thread.currentThread ().name}")
                        return t+1
                    }
                })
                .map(object:Function<Int,Int>{
                    override fun apply(t: Int): Int {
                        println(${thread.currentThread ().name}")
                        return t+1
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(object:SingleObserver<Int>{
                    override fun onSubscribe(d: Disposable?) {
                        println(${thread.currentThread ().name}")}override fun onSuccess(t: Int?) {
                        println(${thread.currentThread ().name}")}override fun onError(e: Throwable?) {}})Copy the code

This article explores the nature of streaming + thread scheduling with the simplest of creation types. Other complex operator extensions are not expanded here.


END