Observable’s.subscribeon (schedulers.io ()) method specifies in which thread the stream of events processed is executed
Schedulers: provides access to methods IOTask: unified task Scheduler: create way unified API calls, IoScheduler/ComputationScheduler NewThreadScheduler are all derived classes. Scheduler.Worker: Manages tasks that are executed.
1. After using this method, an ObservableSubscribeOn object is created, which wraps the upper Observable and saves the incoming Scheduler.
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
Copy the code
2, and then execute the subscribeActual method of ObservableSubscribeOn. This process was written in (1). In this method, the core code is the third sentence.
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T.T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
Copy the code
3,
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
In this line scheduler is the thread pool that processes runable, which is the schedulers.io () object passed in. SubscribeTask is a Runnable
The thread pool executes runable’s run method. The source is the top Observable, and the parent is the observer wrapped around the bottom
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
public void run(a) { source.subscribe(parent); }}}Copy the code
The final results are called to onNext in the SubscribeOnObserver.
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
final Observer<? super T> actual;
final AtomicReference<Disposable> s;
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
@Override
public void onNext(T t) { actual.onNext(t); }}Copy the code
4. Preparation of schedulers.io (
IO assignments are initialized in static code blocks.
public static Scheduler io(a) {
return RxJavaPlugins.onIoScheduler(IO);
}
static {
// Using this method to call the IOTask call returns a new IoScheduler() object
IO = RxJavaPlugins.initIoScheduler(new IOTask());
}
Copy the code
5. IoScheduler inherits Scheduler. The Scheduler’s abstract method createWorker is implemented.
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
Copy the code
The IoScheduler createWorker method creates the EventLoopWorker object. So the W.schedule is the schedule method in the EventLoopWorker that is called.
@Override
public Worker createWorker(a) {
return new EventLoopWorker(pool.get());
}
Copy the code
static final class EventLoopWorker extends Scheduler.Worker {
private final CompositeDisposable tasks;
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get();
}
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
return EmptyDisposable.INSTANCE;
}
returnthreadWorker.scheduleActual(action, delayTime, unit, tasks); }}Copy the code
7, ThreadWorker ThreadWorker class, inherits NewThreadWorker, scheduleActual in NewThreadWorker.
static final class ThreadWorker extends NewThreadWorker {
private long expirationTime;
ThreadWorker(ThreadFactory threadFactory) {
super(threadFactory);
this.expirationTime = 0L;
}
public long getExpirationTime(a) {
return expirationTime;
}
public void setExpirationTime(long expirationTime) {
this.expirationTime = expirationTime; }}Copy the code
See here executor.schedule, where the thread pool is called to execute runable. The runable method is passed all the way here.
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if(parent ! =null) {
if(! parent.add(sr)) {returnsr; } } Future<? > f;try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if(parent ! =null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
Copy the code
The class diagram