All the analysis in this article is based on RxJava2. The following RxJava refers to RxJava2 as you will know from reading this article:
- RxJava’s watch binding and event sending process
- RxJava watches for binding and thread switching during event sending
This library has been used for a long time in project development, from RxJava1.0 to RxJava2.0. Chain calls and silky thread switching are nice, but it’s easy to fall into the thread scheduling pit if you don’t get the hang of it. In this article, we will analyze RxJava’s subscription process, time sending process and thread scheduling
Subscriptions and event streams
Say first conclusion
- Events are sent from top to bottom in the order in which the code is written
- The subscription from
subscribe()
Start subscribing from the bottom up, which is the starting point of the entire event flow, and the entire operation will only take effect when the subscription starts - The event will not be sent until the subscription is complete
The illustration
To make it easier to understand the direction of the subscription flow, I call Observablesubscribe()
The subscription is described as ObserverbeSubscribed()
Source code analysis
Observabe creation process
This corresponds to the black arrows in the figure, using the map() operation in the operator as an example:
@CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) { ObjectHelper.requireNonNull(mapper, "mapper is null"); return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper)); } Copy the code
When the Map operator is called, RxJavaPliguns registers a new ObservableMap object. Look at the other operators and you’ll see that all ObservableMap objects are generated. Meanwhile, the upstream Observabe passes in the Source property assigned to the new Observable as the Source parameter. Layer down, you can continue to use operators on the newly generated Observable.
Subscription process:
The subscribe process begins when the subscribe () method of the last Observable is called. This process corresponds to the red arrows in the figure
@SchedulerSupport(SchedulerSupport.NONE) @Override public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins"); subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); thrownpe; }}Copy the code
Subscribe (Observer) actually calls the subscribeActual() method in each Observable implementation subclass:
@Override public void subscribeActual(Observer<? super U> t) { source.subscribe(new MapObserver<T, U>(t, function)); } Copy the code
The subscribeActual() method is also simple, calling source to subscribe to a newly generated Observer object, Also, the new MapObserver will assign the Observer that was passed in when we called SUBSCRIBE () to the Downstream attribute. In this way, each level of subscription connects a superior Observable, an Observer generated at the same level, and an Observer passed in by the subordinate until the entire subscription process reaches the location where the Observable was originally created.
Event sending process:
ObservableCreate: ObservableCreate ObservableCreate: ObservableCreate, ObservableCreate, ObservableCreate, ObservableCreate, ObservableCreate, ObservableCreate, ObservableCreate, ObservableCreate, ObservableCreate, ObservableCreate, ObservableCreate
@Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch(Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); }}Copy the code
Where does source.subscrebe(parent) come from when you subscribe?
public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; } Copy the code
Observable.create(object : ObservableOnSubscribe<String> { override fun subscribe(emitter: ObservableEmitter<String>) { emitter.onNext("data")}})Copy the code
From the code, we can see that this source is the ObservableOnSubscribe we passed when we created it, so Emitters. OnNext (“data”) is the starting point for sending events. Moving on to what Emitter’s onNext() does:
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
Copy the code
Source. Subscribe (new MapObserver
(T, function)); This sends the event to the next level, and the next level Observer also sends the event to the next level on onNext(), until finally the Observer instance we subscribe() completes.
Thread scheduling
The event subscription sending process has been pretty much figured out from the above article, but let’s focus on another thread scheduling issue.
scheduling
Thread transformation in RxJava is carried out by subscribeOn() and observeOn() operations. Where subscribeOn() changes the execution thread of the subscribing thread, that is, the thread where the event occurs. ObserveOn () changes the thread on which the event result observer callback is located, the thread on which the onNext() method is located.
subscribeOn()
observeOn()
Source code analysis
As mentioned earlier, RxJava generates a new Observable for each operator used. ObserveOn () and subscribeOn() are no exception. The core logic of thread scheduling is in ObservableSubscribeOn and ObservableObserveOn classes
SubscribeOn () process
@CheckReturnValue @SchedulerSupport(SchedulerSupport.CUSTOM) public final Observable<T> subscribeOn(Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler)); } Copy the code
Calling subscribeOn() generates a new ObservableSubscribeOn and passes in the current Observable and incoming Scheduler as arguments. Subscribe () causes the entire Observable chain to call subscribe() bottom-up. The subscribe() method actually calls the subscribeActual() method that implements subclasses of the abstract Observable.
@Override public void subscribeActual(final Observer<? super T> observer) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer); observer.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); } Copy the code
Scheduler. scheduleDirect(new SubscribeTask(parent)); ,SubscribeTask is called a superior Observable to subscribe to the generated SubscribeOnObserver.
@NonNull public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { final Worker w = createWorker(); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); DisposeTask task = new DisposeTask(decoratedRun, w); w.schedule(task, delay, unit); return task; } Copy the code
The scheduleDirect method uses the incoming scheduler to create a Worker object on the specified thread to execute the SubscribeTask, thus switching the subscribing threads. Therefore, when multiple subscribeOn() are stacked, the final thread will still return to the thread specified by the last subscribeOn() executed (where the code appears for the first time).
ObserveOn () process
Calling the observeOn(Scheduler) method calls the internal method of the same name to generate a new ObservableObserveOn object, passing in the current Observable and the incoming Scheduler as arguments. ObservableSubscribeOn is different from ObservableSubscribeOn in that ObservableSubscribeOn calls its parent Observable to subscribe to itself directly on the current thread. ObservableObserveOn’s ObserveOnObserver schedules the thread that sends the result data.
@Override public void onNext(T t) { if (done) { return; } if(sourceMode ! = QueueDisposable.ASYNC) { queue.offer(t); } schedule(); }void schedule(a) { if (getAndIncrement() == 0) { worker.schedule(this); }}Copy the code
It can be found from the source code that the worker will eventually be used to send events downstream. This worker is the worker created by the thread specified in our observeOn() method. Since events are top-down, thread changes can be felt in downstream events with each switch.
Log analysis
It’s not easy to explain the thread changes when subscribeOn() and observeOn() are combined, so let’s look at the thread changes that occur when either operator is used alone.
Only the subscribeOn() scheduling thread is called
Observable.just("Data")
.map {
Log.d("Map 1", Thread.currentThread().name)
return@map it
}
.subscribeOn(Schedulers.io())
.doOnSubscribe {
Log.d("doOnSubscribe 1 ", Thread.currentThread().name)
}
.map {
Log.d("Map 2 ", Thread.currentThread().name)
return@map it
}
.subscribeOn(Schedulers.newThread())
.doOnSubscribe {
Log.d("doOnSubscribe 2 ", Thread.currentThread().name)
}
.map {
Log.d("Map 3 ", Thread.currentThread().name)
return@map it
}
.subscribe(object : Observer<String> {
override fun onComplete(a){}override fun onSubscribe(d: Disposable) {
Log.d("onSubscribe", Thread.currentThread().name)
}
override fun onNext(t: String) {
Log.d("onNext", Thread.currentThread().name)
}
override fun onError(e: Throwable) {
e.printStackTrace()
}
})
Copy the code
Execution Result:
- 1. Subscribe from bottom up (onSubscribe ->doOnSubscribe 2 ->doOnSubscribe 1)
- 2, from the bottom up, each call
subscribeOn
The subscriber thread will change until the next callsubscribeOn
- 3. Events are passed from top to bottom (Map 1 –> Map 2 –> Map 3 –> onNext) from the same thread as the last one
RxCachedThreadScheduler-1
Only the subscribeOn() scheduling thread is called
Observable.just("Data")
.map {
Log.d("Map 1", Thread.currentThread().name)
return@map it
}
// .doOnSubscribe {
// Log.d("doOnSubscribe 1 ", Thread.currentThread().name)
/ /}
// .subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.map {
Log.d("Map 2 ", Thread.currentThread().name)
return@map it
}
// .doOnSubscribe {
// Log.d("doOnSubscribe 2 ", Thread.currentThread().name)
/ /}
// .subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.map {
Log.d("Map 3 ", Thread.currentThread().name)
return@map it
}
.subscribe(object : Observer<String> {
override fun onComplete(a) {}override fun onSubscribe(d: Disposable) {
Log.d("onSubscribe", Thread.currentThread().name)
}
override fun onNext(t: String) {
Log.d("onNext", Thread.currentThread().name)
}
override fun onError(e: Throwable) {
e.printStackTrace()
}
})
Copy the code
Execution Result:
- 1. Event sending is normal from top to bottom (Map 1 –> Map 2 –> Map 3 –> onNex)
- 2, from the top down, each call
observeOn
Observe that the callback threads will switch once (main –> rxNewThreadScheduler-1 –> rxNewThreadScheduler-2)
Mix scheduling threads
We open all the comments in the above code and get the following log:
We can see from the above three log printing:
The logs of the subscription chain are printed bottom-up, and then the observation results are printed top-down. SubscribeOn switches threads, which is not valid only when the thread is specified for the first time (that is, the last time from bottom to top) as some articles say. The first time is just our illusion, because the subscription is bottom-up and no matter how the previous thread switches the trace will be switched to the first thread that subscribeOn specifies (that is, the last thread that subscribeOn assigns bottom-up). When no thread switch operation is performed in the callback result, we can only perceive this thread switch (Map1 is the same as doOnSubscribe 1 thread). Each specified thread of observeOn causes the event stream to be switched to the corresponding thread. The complete event subscription and sending process is shown below, starting with our call subscribe() to associate the Observer with the observed object and ending with the onNext or onError of the Observer passed in Subscribe (), forming a counterclockwise n-shaped chain. In the observation chain on the right, the observation thread will be switched each time subscribeOn is executed. The event-sending chain on the left side sends events from the thread specified last time the watch chain was called, and each call to observeOn specifies a new event-sending thread.
The illustration
Referring to the above source code and log analysis, combined with this figure I believe that you will have a more three-dimensional understanding of RxJava field scheduling