All the analysis in this article is based on RxJava2. The following RxJava refers to RxJava2 as you will know from reading this article:

  • RxJava’s watch binding and event sending process
  • RxJava watches for binding and thread switching during event sending

This library has been used for a long time in project development, from RxJava1.0 to RxJava2.0. Chain calls and silky thread switching are nice, but it’s easy to fall into the thread scheduling pit if you don’t get the hang of it. In this article, we will analyze RxJava’s subscription process, time sending process and thread scheduling

Subscriptions and event streams

Say first conclusion

  • Events are sent from top to bottom in the order in which the code is written
  • The subscription fromsubscribe()Start subscribing from the bottom up, which is the starting point of the entire event flow, and the entire operation will only take effect when the subscription starts
  • The event will not be sent until the subscription is complete

The illustration

To make it easier to understand the direction of the subscription flow, I call Observablesubscribe()The subscription is described as ObserverbeSubscribed()

Source code analysis

Observabe creation process

This corresponds to the black arrows in the figure, using the map() operation in the operator as an example:

   @CheckReturnValue
   @SchedulerSupport(SchedulerSupport.NONE)
   public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
       ObjectHelper.requireNonNull(mapper, "mapper is null");
       return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
   }
Copy the code

When the Map operator is called, RxJavaPliguns registers a new ObservableMap object. Look at the other operators and you’ll see that all ObservableMap objects are generated. Meanwhile, the upstream Observabe passes in the Source property assigned to the new Observable as the Source parameter. Layer down, you can continue to use operators on the newly generated Observable.

Subscription process:

The subscribe process begins when the subscribe () method of the last Observable is called. This process corresponds to the red arrows in the figure

   @SchedulerSupport(SchedulerSupport.NONE)
   @Override
   public final void subscribe(Observer<? super T> observer) {
       ObjectHelper.requireNonNull(observer, "observer is null");
       try {
           observer = RxJavaPlugins.onSubscribe(this, observer);

           ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

           subscribeActual(observer);
       } catch (NullPointerException e) { // NOPMD
           throw e;
       } catch (Throwable e) {
           Exceptions.throwIfFatal(e);
           // can't call onError because no way to know if a Disposable has been set or not
           // can't call onSubscribe because the call might have set a Subscription already
           RxJavaPlugins.onError(e);

           NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
           npe.initCause(e);
           thrownpe; }}Copy the code

Subscribe (Observer) actually calls the subscribeActual() method in each Observable implementation subclass:

   @Override
   public void subscribeActual(Observer<? super U> t) {
       source.subscribe(new MapObserver<T, U>(t, function));
   }
Copy the code

The subscribeActual() method is also simple, calling source to subscribe to a newly generated Observer object, Also, the new MapObserver will assign the Observer that was passed in when we called SUBSCRIBE () to the Downstream attribute. In this way, each level of subscription connects a superior Observable, an Observer generated at the same level, and an Observer passed in by the subordinate until the entire subscription process reaches the location where the Observable was originally created.

Event sending process:

ObservableCreate: ObservableCreate ObservableCreate: ObservableCreate, ObservableCreate, ObservableCreate, ObservableCreate, ObservableCreate, ObservableCreate, ObservableCreate, ObservableCreate, ObservableCreate, ObservableCreate, ObservableCreate

   @Override
   protected void subscribeActual(Observer<? super T> observer) {
       CreateEmitter<T> parent = new CreateEmitter<T>(observer);
       observer.onSubscribe(parent);

       try {
           source.subscribe(parent);
       } catch(Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); }}Copy the code

Where does source.subscrebe(parent) come from when you subscribe?

   public ObservableCreate(ObservableOnSubscribe<T> source) {
       this.source = source;
   }
Copy the code
   Observable.create(object : ObservableOnSubscribe<String> {
          override fun subscribe(emitter: ObservableEmitter<String>) {
               emitter.onNext("data")}})Copy the code

From the code, we can see that this source is the ObservableOnSubscribe we passed when we created it, so Emitters. OnNext (“data”) is the starting point for sending events. Moving on to what Emitter’s onNext() does:

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }
Copy the code

Source. Subscribe (new MapObserver

(T, function)); This sends the event to the next level, and the next level Observer also sends the event to the next level on onNext(), until finally the Observer instance we subscribe() completes.
,>

Thread scheduling

The event subscription sending process has been pretty much figured out from the above article, but let’s focus on another thread scheduling issue.

scheduling

Thread transformation in RxJava is carried out by subscribeOn() and observeOn() operations. Where subscribeOn() changes the execution thread of the subscribing thread, that is, the thread where the event occurs. ObserveOn () changes the thread on which the event result observer callback is located, the thread on which the onNext() method is located.

subscribeOn()
observeOn()

Source code analysis

As mentioned earlier, RxJava generates a new Observable for each operator used. ObserveOn () and subscribeOn() are no exception. The core logic of thread scheduling is in ObservableSubscribeOn and ObservableObserveOn classes

SubscribeOn () process

  @CheckReturnValue
  @SchedulerSupport(SchedulerSupport.CUSTOM)
  public final Observable<T> subscribeOn(Scheduler scheduler) {
      ObjectHelper.requireNonNull(scheduler, "scheduler is null");
       return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
   }
Copy the code

Calling subscribeOn() generates a new ObservableSubscribeOn and passes in the current Observable and incoming Scheduler as arguments. Subscribe () causes the entire Observable chain to call subscribe() bottom-up. The subscribe() method actually calls the subscribeActual() method that implements subclasses of the abstract Observable.

   @Override
   public void subscribeActual(final Observer<? super T> observer) {
       final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);

       observer.onSubscribe(parent);

       parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
   }
Copy the code

Scheduler. scheduleDirect(new SubscribeTask(parent)); ,SubscribeTask is called a superior Observable to subscribe to the generated SubscribeOnObserver.

   @NonNull
   public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
       final Worker w = createWorker();

       final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

       DisposeTask task = new DisposeTask(decoratedRun, w);

       w.schedule(task, delay, unit);

       return task;
   }
Copy the code

The scheduleDirect method uses the incoming scheduler to create a Worker object on the specified thread to execute the SubscribeTask, thus switching the subscribing threads. Therefore, when multiple subscribeOn() are stacked, the final thread will still return to the thread specified by the last subscribeOn() executed (where the code appears for the first time).

ObserveOn () process

Calling the observeOn(Scheduler) method calls the internal method of the same name to generate a new ObservableObserveOn object, passing in the current Observable and the incoming Scheduler as arguments. ObservableSubscribeOn is different from ObservableSubscribeOn in that ObservableSubscribeOn calls its parent Observable to subscribe to itself directly on the current thread. ObservableObserveOn’s ObserveOnObserver schedules the thread that sends the result data.

       @Override
       public void onNext(T t) {
           if (done) {
               return;
           }

           if(sourceMode ! = QueueDisposable.ASYNC) { queue.offer(t); } schedule(); }void schedule(a) {
           if (getAndIncrement() == 0) {
               worker.schedule(this); }}Copy the code

It can be found from the source code that the worker will eventually be used to send events downstream. This worker is the worker created by the thread specified in our observeOn() method. Since events are top-down, thread changes can be felt in downstream events with each switch.

Log analysis

It’s not easy to explain the thread changes when subscribeOn() and observeOn() are combined, so let’s look at the thread changes that occur when either operator is used alone.

Only the subscribeOn() scheduling thread is called

Observable.just("Data")
                .map {
                    Log.d("Map 1", Thread.currentThread().name)
                    return@map it
                }
                .subscribeOn(Schedulers.io()) 
                .doOnSubscribe {
                    Log.d("doOnSubscribe 1 ", Thread.currentThread().name)
                }
                .map {
                    Log.d("Map 2 ", Thread.currentThread().name)
                    return@map it
                }
                .subscribeOn(Schedulers.newThread())
                .doOnSubscribe {
                    Log.d("doOnSubscribe 2 ", Thread.currentThread().name)
                }
                .map {
                    Log.d("Map 3 ", Thread.currentThread().name)
                    return@map it
                }
                .subscribe(object : Observer<String> {
                    override fun onComplete(a){}override fun onSubscribe(d: Disposable) {
                        Log.d("onSubscribe", Thread.currentThread().name)
                    }

                    override fun onNext(t: String) {
                        Log.d("onNext", Thread.currentThread().name)
                    }

                    override fun onError(e: Throwable) {
                        e.printStackTrace()
                    }

                })
Copy the code

Execution Result:

  • 1. Subscribe from bottom up (onSubscribe ->doOnSubscribe 2 ->doOnSubscribe 1)
  • 2, from the bottom up, each callsubscribeOnThe subscriber thread will change until the next callsubscribeOn
  • 3. Events are passed from top to bottom (Map 1 –> Map 2 –> Map 3 –> onNext) from the same thread as the last oneRxCachedThreadScheduler-1

Only the subscribeOn() scheduling thread is called

        Observable.just("Data")
                .map {
                    Log.d("Map 1", Thread.currentThread().name)
                    return@map it
                }
// .doOnSubscribe {
// Log.d("doOnSubscribe 1 ", Thread.currentThread().name)
/ /}
// .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.newThread())
                .map {
                    Log.d("Map 2 ", Thread.currentThread().name)
                    return@map it
                }
// .doOnSubscribe {
// Log.d("doOnSubscribe 2 ", Thread.currentThread().name)
/ /}
// .subscribeOn(Schedulers.newThread())
                .observeOn(Schedulers.newThread())
                .map {
                    Log.d("Map 3 ", Thread.currentThread().name)
                    return@map it
                }
                .subscribe(object : Observer<String> {
                    override fun onComplete(a) {}override fun onSubscribe(d: Disposable) {
                        Log.d("onSubscribe", Thread.currentThread().name)
                    }

                    override fun onNext(t: String) {
                        Log.d("onNext", Thread.currentThread().name)
                    }

                    override fun onError(e: Throwable) {
                        e.printStackTrace()
                    }

                })
Copy the code

Execution Result:

  • 1. Event sending is normal from top to bottom (Map 1 –> Map 2 –> Map 3 –> onNex)
  • 2, from the top down, each callobserveOnObserve that the callback threads will switch once (main –> rxNewThreadScheduler-1 –> rxNewThreadScheduler-2)

Mix scheduling threads

We open all the comments in the above code and get the following log:

We can see from the above three log printing:

The logs of the subscription chain are printed bottom-up, and then the observation results are printed top-down. SubscribeOn switches threads, which is not valid only when the thread is specified for the first time (that is, the last time from bottom to top) as some articles say. The first time is just our illusion, because the subscription is bottom-up and no matter how the previous thread switches the trace will be switched to the first thread that subscribeOn specifies (that is, the last thread that subscribeOn assigns bottom-up). When no thread switch operation is performed in the callback result, we can only perceive this thread switch (Map1 is the same as doOnSubscribe 1 thread). Each specified thread of observeOn causes the event stream to be switched to the corresponding thread. The complete event subscription and sending process is shown below, starting with our call subscribe() to associate the Observer with the observed object and ending with the onNext or onError of the Observer passed in Subscribe (), forming a counterclockwise n-shaped chain. In the observation chain on the right, the observation thread will be switched each time subscribeOn is executed. The event-sending chain on the left side sends events from the thread specified last time the watch chain was called, and each call to observeOn specifies a new event-sending thread.

The illustration

Referring to the above source code and log analysis, combined with this figure I believe that you will have a more three-dimensional understanding of RxJava field scheduling