Let’s start with a little chestnut

Observable.just(1)
        .subscribeOn(Schedulers.newThread())
        .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {}@Override
            public void onNext(Integer integer) {}@Override
            public void onError(Throwable e) {}@Override
            public void onComplete(a) {}});Copy the code

Schedulers. NewThread method

public final class Schedulers {...static final Scheduler NEW_THREAD;

    static final class NewThreadHolder {
        static final Scheduler DEFAULT = new NewThreadScheduler();
    }

    static{... NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
    }
    
    public static Scheduler newThread(a) {
        return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
    }

    static final class NewThreadTask implements Callable<Scheduler> {
        @Override
        public Scheduler call(a) throws Exception {
            returnNewThreadHolder.DEFAULT; }}... }Copy the code

The schedulers. newThread method essentially returns a NewThreadScheduler object.

The subscribeOn method prototype is as follows

public final Observable<T> subscribeOn(Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
Copy the code

SubscribeOn returns an ObservableSubscribeOn object.

ObservableSubscribeOn constructor

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T.T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler; }... }Copy the code

The argument to ObservableSubscribeOn, where the scheduler represents the NewThreadScheduler object we pass in

AbstractObservableWithUpstream

abstract class AbstractObservableWithUpstream<T.U> extends Observable<U> 
    implements HasUpstreamObservableSource<T> {

    protected final ObservableSource<T> source;

    AbstractObservableWithUpstream(ObservableSource<T> source) {
        this.source = source;
    }

    @Override
    public final ObservableSource<T> source(a) {
        returnsource; }}Copy the code

The ObservableSubscribeOn argument, where source represents the ObservableJust object returned by the just method.

The subscribe method

public final void subscribe(Observer<? super T> observer) {... subscribeActual(observer); . }Copy the code

Here we call the subscribeActual of ObservableSubscribeOn, passing in our own observer

The ObservableSubscribeOn subscribeActual

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T.T> {...@Override
    public void subscribeActual(final Observer<? super T> observer) {
        / / 1
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
        
        / / 2
        observer.onSubscribe(parent);

        / / 3
        parent.setDisposable(scheduler.scheduleDirect(newSubscribeTask(parent))); }... }Copy the code

Note 1 wraps our custom Observer as a SubscribeOnObserver object.

We call our own observer onSubscribe method at comment 2, and there is no thread switch.

In note 3, a SubscribeTask object is first created and Runnable interface is implemented, and then scheduling is carried out.

SubscribeTask

final class SubscribeTask implements Runnable {
    private final SubscribeOnObserver<T> parent;

    SubscribeTask(SubscribeOnObserver<T> parent) {
        this.parent = parent;
    }

    @Override
    public void run(a) {
        / / 4source.subscribe(parent); }}Copy the code

Note 4 shows that a thread switch has occurred, meaning that the subscription process takes place on the specified thread. The following article will analyze the source code for switching threads. Source is the ObservableJust object returned by the just method we talked about earlier. Parent is the ObservableSubscribeOn object that we created in the subscribeActual method.

The ObservableJust subscribeActual

public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {

    private final T value;
    public ObservableJust(final T value) {
        this.value = value;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        ScalarDisposable<T> sd = new ScalarDisposable<T>(observer, value);
        / / 1
        observer.onSubscribe(sd);
        / / 2
        sd.run();
    }

    @Override
    public T call(a) {
        returnvalue; }}Copy the code

Note that since the subscription occurs on the specified thread, it is also executed on the specified thread

In Note 1, the Observer for the subscribeActual method is the SubscribeOnObserver object passed in when you subscribe.

ScalarDisposable’s run method

public static final class ScalarDisposable<T>
    extends AtomicInteger
    implements QueueDisposable<T>, Runnable {

    private static final long serialVersionUID = 3880992722410194083L;

    final Observer<? super T> observer;

    final T value;

    static final int START = 0;
    static final int FUSED = 1;
    static final int ON_NEXT = 2;
    static final int ON_COMPLETE = 3;

    public ScalarDisposable(Observer<? super T> observer, T value) {
        this.observer = observer;
        this.value = value; }...@Override
    public void run(a) {
        if (get() == START && compareAndSet(START, ON_NEXT)) {
            / / 1
            observer.onNext(value);
            if (get() == ON_NEXT) {
                lazySet(ON_COMPLETE);
                
                / / 2observer.onComplete(); }}}}Copy the code

Note 1 onNext runs in the thread at subscription time

Note 2’s onComplete runs in the thread at subscription time

SubscribeOnObserver

static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> 
    implements Observer<T>, Disposable {

    private static final long serialVersionUID = 8094547886072529208L;
    final Observer<? super T> downstream;

    final AtomicReference<Disposable> upstream;

    SubscribeOnObserver(Observer<? super T> downstream) {
        this.downstream = downstream;
        this.upstream = new AtomicReference<Disposable>();
    }

    @Override
    public void onSubscribe(Disposable d) {
        DisposableHelper.setOnce(this.upstream, d);
    }

    @Override
    public void onNext(T t) {
        / / 1
        downstream.onNext(t);
    }

    @Override
    public void onComplete(a) {
        / / 2downstream.onComplete(); }... }Copy the code

Notice that the Downstream in the SubscribeOnObserver is our own observer.

Note 1 onNext runs in the thread at subscription time

Note 2’s onComplete runs in the thread at subscription time

conclusion

The subscribeOn method controls the threads to which the upstream Observable subscribes.

SubscribeOn wraps the downstream Observers as SubscribeOnObserver objects, which run on the thread at the time of subscription.