Small example

Observable.create(object: ObservableOnSubscribe<String> {
    override fun subscribe(emitter: ObservableEmitter<String>) {
        emitter.onNext("arrom")
    }

}).subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(object :Consumer<String>{
        override fun accept(t: String) {
            TODO("Not yet implemented")
        }
    })
Copy the code

Observable.create

public static <@NonNull T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
    Objects.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
}
Copy the code

Pass in an ObservableOnSubscribe and produce an ObservableCreate object

ObservableOnSubscribe is an inner class written by itself

ObservableOnSubscribe<String> {
    override fun subscribe(emitter: ObservableEmitter<String>) {
        emitter.onNext("arrom")
    }
}
Copy the code

ObservableCreate is constructed as follows

public ObservableCreate(ObservableOnSubscribe<T> source) {
    this.source = source;
}
Copy the code

Execute the subscribeOn(schedulers.io ()) method

@NonNull
public final Observable<T> subscribeOn(@NonNull Scheduler scheduler) {
    Objects.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<>(this, scheduler));
}
Copy the code

Generates an ObservableSubscribeOn object

Perform observeOn (AndroidSchedulers mainThread ())

public final Observable<T> observeOn(@NonNull Scheduler scheduler, boolean delayError, int bufferSize) {
    Objects.requireNonNull(scheduler, "scheduler is null");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<>(this, scheduler, delayError, bufferSize));
}
Copy the code

Generate an ObservableObserveOn object

Execute the subscribe() method

public final Disposable subscribe(@NonNull Consumer<? super T> onNext) {
    return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION);
}
Copy the code

In the SUBSCRIBE method we pass an onNext(Consumer object) operation into the SUBSCRIBE source code

public final Disposable subscribe(@NonNull Consumer<? super T> onNext, @NonNull Consumer<? super Throwable> onError,
        @NonNull Action onComplete) {
    Objects.requireNonNull(onNext, "onNext is null");
    Objects.requireNonNull(onError, "onError is null");
    Objects.requireNonNull(onComplete, "onComplete is null");

    LambdaObserver<T> ls = new LambdaObserver<>(onNext, onError, onComplete, Functions.emptyConsumer());

    subscribe(ls);

    return ls;
}
Copy the code

Create a new LambdaObserver and pass in onNext, onError, onComplete, and onSubscribe

Call the subscribe method
public final void subscribe(@NonNull Observer<? super T> observer) { Objects.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins"); subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; }}Copy the code

In this case, the subscribe method is called on the ObservableObserveOn object after the observeOn method. Finally, the subscribeActual method inside ObservableObserveOn is called

@Override protected void subscribeActual(Observer<? super T> observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize)); }}Copy the code

thread

SubscribeOn (schedulers.io ()) generates an ObservableSubscribeOn object

Schedulers.io() generates a thread scheduler object, IoScheduler

@Override
public void subscribeActual(final Observer<? super T> observer) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);

    observer.onSubscribe(parent);

    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

final class SubscribeTask implements Runnable {
    private final SubscribeOnObserver<T> parent;

    SubscribeTask(SubscribeOnObserver<T> parent) {
        this.parent = parent;
    }

    @Override
    public void run() {
        source.subscribe(parent);
    }
}

public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    final Worker w = createWorker();

    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    DisposeTask task = new DisposeTask(decoratedRun, w);

    w.schedule(task, delay, unit);

    return task;
}


Copy the code

CreateWorker is implemented by subclasses, so we directly poke into IoSchedule’s createWorke method

public Worker createWorker() {
    return new EventLoopWorker(pool.get());
}
Copy the code

Enter the EventLoopWorker class

public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
    if (tasks.isDisposed()) {
        // don't schedule, we are unsubscribed
        return EmptyDisposable.INSTANCE;
    }

    return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
Copy the code

Move on to the scheduleActual method

public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) { Runnable decoratedRun = RxJavaPlugins.onSchedule(run); ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent); if (parent ! = null) { if (! parent.add(sr)) { return sr; } } Future<? > f; try { if (delayTime <= 0) { f = executor.submit((Callable<Object>)sr); } else { f = executor.schedule((Callable<Object>)sr, delayTime, unit); } sr.setFuture(f); } catch (RejectedExecutionException ex) { if (parent ! = null) { parent.remove(sr); } RxJavaPlugins.onError(ex); } return sr; }Copy the code

The ScheduledRunnable class is displayed

public void run() { lazySet(THREAD_INDEX, Thread.currentThread()); try { try { actual.run(); } catch (Throwable e) { // Exceptions.throwIfFatal(e); nowhere to go RxJavaPlugins.onError(e); throw e; } } finally { Object o = get(PARENT_INDEX); if (o ! = PARENT_DISPOSED && compareAndSet(PARENT_INDEX, o, DONE) && o ! = null) { ((DisposableContainer)o).delete(this); } for (;;) { o = get(FUTURE_INDEX); if (o == SYNC_DISPOSED || o == ASYNC_DISPOSED || compareAndSet(FUTURE_INDEX, o, DONE)) { break; } } lazySet(THREAD_INDEX, null); }}Copy the code

ObservableCreate. CreateEmitter. OnNext method is in ObservableSubscribeOn SubscribeTask’s run method is invoked, also said just now, . Through the various advanced complex ways ObservableSubscribeOn SubscribeTask into a new Thread (Schedules. IO) to carry out, Start from ObservableCreate. CreateEmitter. OnNext method, subsequent execution logic is also in a new Thread (we specified Schedules. IO)

General flow chart