preface

In the previous two articles, the basic path and chain calls of RxJava2 were explained, and if forgotten, this is the portal

  • RxJava2 source code a: first Rx
  • RxJava2 source code two: chain secret

In fact, from the Rx call chain, thread scheduling is only a link to comply with the operation mechanism, but because of its convenient, high frequency characteristics, and in the project is likely to need to switch to their own threads, so select it, understand how to achieve.

The body of the

case

ob.observeOn(Schedulers.newThread())
Copy the code

Because the process, chain are described, so the case directly to the thread scheduling thread. In the case above, the downstream is switched to a new thread to respond.

Scheduler and worker thread

In Rx, the scheduler is responsible for providing the worker thread, and the worker thread is responsible for the actual operation. The following is brief information about both.

public abstract class Scheduler {
    ......
    public abstract Worker createWorker();
}

public abstract static class Worker implements Disposable {
    ......
    public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);
}
Copy the code
  • Scheduler.createworker (): Gets the thread used to execute the task
  • Worker.schedule() : Specific scheduling logic

In the current case, ob.observeon () gets ObservableObserveOn as an Observable.

@Override
protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } elseWorker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); }}Copy the code

When Rx builds call chain, subscribeActual() will be sent, and for the current case, the event will be pushed to the corresponding downstream node before thread scheduling. Therefore, the scheduling information is saved by ObserveOnObserver, and the specific information is as follows:

static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable { ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) { this.actual = actual; this.worker = worker; this.delayError = delayError; this.bufferSize = bufferSize; }}Copy the code

When the event is pushed to the current node, ObserveOnObserver will handle it. Take onNext()

@Override
public void onNext(T t) {
    ......
    schedule();
}

void schedule() {
    if (getAndIncrement() == 0) {
        worker.schedule(this);
    }
}

@Override
public void run() {
    if (outputFused) {
        drainFused();
    } else{ drainNormal(); }}Copy the code

ObserveOnObserver will be given to the Worker thread as a Runnable. When ObserveOnObserver is given the opportunity to run as a thread task, drainFused() or drainNormal() pushes events to the next node on the drainNormal() depending on the case. The code is not attached. And the thread scheduling is complete.

Why so short

Under the current node, the focus is on how Rx conducts thread scheduling, and the core is to run the current node as Runnable in the corresponding thread, the steps are as follows:

  • The Scheduler provides the Worker thread Worker
  • At the response time of the current node, the node is handed to the Worker as Runnable for processing, and the event is pushed to the next node at the time of run()

As for how Worker handles Runnable and when to get execution timing, this is a problem outside of Rx mechanism, because the following response can be guaranteed to run in Worker thread, i.e., thread scheduling is completed, as shown below: