Before introducing the principle of thread switching, how to do thread switching in Rxjava2

Sacrifice the official website map

There are two operators that determine how to switch threads:

  • subscribeOnSpecify the upstreamsubscribe()The thread that occurs
  • observeOnThe specifiedSubscriberThe thread in which it is running

In spoken English, subscribeOn determines which thread is sent from the event source, and observeOn determines which thread the subscription will occur after each call to it.

Single.subscribeon () event stream subscription

Single
		.just(1L)
    .subscribeOn(Schedulers.io())
Copy the code

Click on the subscribeOn source code, skip the hook call, and ObservableSubscribeOn implements it

@Override
public void subscribeActual(final Observer<? super T> observer) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
    observer.onSubscribe(parent);
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
Copy the code

The internal implementation creates a SubscribeOnObserver to pass on to downstream subscribers.

At the same time, scheduleDirect of the scheduler is called to cut threads, and Runnable is implemented by SubscribeTask

 static final class SubscribeOnObserver<T>
    extends AtomicReference<Disposable>
    implements SingleObserver<T>, Disposable.Runnable {
    @Override
    public void run(a) {
        source.subscribe(this); }}Copy the code

The subscription process is performed in the run method so that the entire upstream subscription occurs in the scheduler’s thread.

As shown in the figure:

Operator thread subscription

Add another operator to look at the thread of execution.

We use white lines to refer to the original thread from which Rxjava is called,

Purple represents the threads that specify subscriptions using SubscribeOn

It can be seen that after SubscribeOn specifies the thread where the subscription is located, all the data will be executed in the thread we execute from the incoming data to the downstream data receiving and processing and returning to the subscriber.

Duplicate SubscribeOn thread switching

If we have multiple SubscribeOn, how do threads switch?

The purple arrow represents the first specified thread in the code, the green arrow represents the subsequent specified thread, and the white arrow represents the original thread from which Rxjava was called.

Although the SubscribeOn specified at last is relatively close to the real Observer, since the thread switch is performed first, when it encounters the SingleSubscribeOn closest to the upstream, it will switch the thread back to the Schedule specified by it.

It can also be seen from the source code that no matter how many times we specify subscribeOn, the most effective subscribeOn is the one assigned first.

The Dispose principle

public final class SingleSubscribeOn<T> extends Single<T> {
	@Override
	protected void subscribeActual(final SingleObserver<? super T> observer) {
	    final SubscribeOnObserver<T> parent = newSubscribeOnObserver<T>(observer, source); Disposable f = scheduler.scheduleDirect(parent); parent.task.replace(f); }}@Override
public void onSubscribe(Disposable d) {
    DisposableHelper.setOnce(this, d);
}

@Override
public void dispose(a) {
    DisposableHelper.dispose(this);
    task.dispose();
}
Copy the code

When Dispose is called, SingleSubscribeOn calls two pieces

  1. Upstream onSubscribe incoming Disposable
  2. Toggles the thread’s Disposable

To notify upstream not to continue sending data and stop the thread switching operation itself.

Single. ObserveOn Thread switching

There is also a common thread switching operator in Rxjava

After ObserveOn specifies a thread, onSuccess and onError of subsequent subscribed observers will be switched to the specified thread.

We use ObserveOn as follows

Single.just(1)
    .observeOn(AndroidSchedulers.mainThread())
Copy the code

View the source code, and the final implementation is to view its subscribeActual in the SingleObserveOn class

@Override
protected void subscribeActual(final SingleObserver<? super T> observer) {
    source.subscribe(new ObserveOnSingleObserver<T>(observer, scheduler));
}
Copy the code

The actual implementation is wrapped in ObserveOnSingleObserver.

static final class ObserveOnSingleObserver<T> extends AtomicReference<Disposable>
implements SingleObserver<T>, Disposable.Runnable {
    @Override
    public void onSubscribe(Disposable d) {
        if (DisposableHelper.setOnce(this, d)) {
            downstream.onSubscribe(this); }}@Override
    public void onSuccess(T value) {
        this.value = value;
        Disposable d = scheduler.scheduleDirect(this);
        DisposableHelper.replace(this, d);
    }
    @Override
    public void onError(Throwable e) {
        this.error = e;
        Disposable d = scheduler.scheduleDirect(this);
        DisposableHelper.replace(this, d);
    }
    @Override
    public void run(a) {
        Throwable ex = error;
        if(ex ! =null) {
            downstream.onError(ex);
        } else{ downstream.onSuccess(value); }}@Override
    public void dispose(a) {
        DisposableHelper.dispose(this); }}Copy the code

This proxy Observer differs from the subscribeOn operator in that the subscribeOn operator switches threads directly when a subscription is received. ObserveOn is a thread switch in onSuccess and OnError, which does not affect the upstream data processing thread, but only the downstream thread where the subscription results are received.

We use the white arrow to refer to the thread on which the Rxjava subscription is invoked, and the red arrow to refer to the thread specified in ObserveOn to switch.

It is also clear from the figure that ObserveOn affects only the threads that subscribe to subsequent results, not the upstream threads that process the data

Combine Map to switch threads

Single.just(1)
    .observeOn(AndroidSchedulers.mainThread())
    .map(object : Function<Int, Int> {
        override fun apply(t: Int): Int {
            return t
        }
    })
Copy the code

The actual running thread is shown in the figure below

The white arrow indicates the thread from which the Rxjava subscription is invoked, and the red arrow indicates the thread specified in ObserveOn to switch.

As you can see, after combining the map, the entire ObserveOn downstream map subscription thread and user subscription result thread are switched to the specified thread in ObserveOn

Switch threads with SubscribeOn

Single.just(1)
    .observeOn(Schedulers.computation())
    .map(object : Function<Int, Int> {
        override fun apply(t: Int): Int {
            returnt } }) .observeOn(AndroidSchedulers.mainThread()) .subscribeOn(Schedulers.io()) .subscribe(object : SingleObserver<Int? > {Copy the code

In the demo, we specified that Map runs in the Computation scheduler, and that user subscriptions are on the Android main thread.

The white arrow represents the thread on which the Rxjava subscription is invoked, the purple arrow represents the thread on which the data source is expected to execute, the blue arrow represents the listener in the Map operator, and the red arrow represents the thread specified by the user in the ObserveOn to switch.

It can be seen that when ObserveOn and SubscribeOn are combined, the defined position of the specific SubscribeOn does not affect the actual subscribed thread or the thread where other subscriptions need to be executed. For each operator that needs to cut threads, simply add an ObserveOn operator before the operator.

The Dispose principle

@Override
public void onSubscribe(Disposable d) {
    if (DisposableHelper.setOnce(this, d)) {
        downstream.onSubscribe(this); }}@Override
public void onSuccess(T value) {
    this.value = value;
    Disposable d = scheduler.scheduleDirect(this);
    DisposableHelper.replace(this, d);
}
@Override
public void onError(Throwable e) {
    this.error = e;
    Disposable d = scheduler.scheduleDirect(this);
    DisposableHelper.replace(this, d);
}

@Override
public void dispose(a) {
    DisposableHelper.dispose(this);
}
Copy the code

Dispose in ObserveOn can be divided into several cases

  1. If the upstream data has not been sent to onSuccess and OnError, the upstream data will be cancelled
  2. Upstream has been sent to ObserveOn, threading is being cut,DisposableHelper.replace(this, d);In this case, the operation of thread cutting is cancelled.

At this point, the principle and process direction of SubscribeOn and ObserveOn how to switch threads are introduced.

Some summary:

  1. SubscribeOn switches the threads sent from the data source
  2. The SubscribeOn position has nothing to do with the position of other operators. When there are two SubscribeOn operators, the SubscribeOn operator at the top level shall prevail
  3. When there is no ObserveOn in the thread switch, the threads of the whole data flow will be switched to the thread specified by SubscribeOn. When there is an ObserveOn thread, the data flow will affect the transmission thread after the operator.

The internal implementation logic of Schedules will be covered in more detail in a later article.