Let’s take the following demo as an example to explain the entire workflow of Rxjava.
public void start() { Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { Log.d(TAG, + thread.currentThread ().getName(); emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); }}); Observer<Integer> observer = new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, + subscribe + thread.currentThread ().getName()); } @override public void onNext(Integer value) {log.d (TAG, "observer Thread in" + thread.currentThread ().getName()); Log.d(TAG, "respond to Next event" + value); } @override public void onError(Throwable e) {log.d (TAG, "response to Error "); } @override public void onComplete() {log.d (TAG, "response to Complete event "); }}; observable1 .subscribeOn(Schedulers.newThread()) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .observeOn(Schedulers.newThread()) .subscribe(observer);Copy the code
subscribe
Operator before
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
Copy the code
- First operator. SubscribeOn (Observable.java)
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
Copy the code
ObservableSubscribeOn AbstractObservableWithUpstream inheritance, is a packaging of observables At this point, the execution of the new ObservableSubscribeOn, Observable has the following structure
- The second operator subscribeOn (Observable. Java) has the same basic execution process as above, but Scheduler is different, so it is still correct
Observable
After executing the New ObservableSubscribeOn,Observable
The structure of
- The third operator. ObserveOn (Observable.java)
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
Copy the code
Wrap the above Observable again as ObservableObserveOn
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
Copy the code
After executing the New ObservableObserveOn, the Observable has the following structure
- The fourth operator. ObserveOn (Observable.java)
It is the same as the Observable in step 3, but the Observable in step 3 is wrapped as follows
To sum up: all operators before Subscribe wrap around observables, so the innermost Observable is observable1 in the demo.
subscribe
process
It’s important to understand that each operator corresponds to an Observer of a static inner class. The operator wraps an Observable downward, and the Observer is wrapped with the corresponding static inner class during the upsubscribe process.
.subscribe(observer);
Copy the code
Here the observer is the one actually declared in the application. Let’s look at the subscription process
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null"); Try {subscribeActual is the last wrapped Observable, ObservableObserveOn subscribeActual(observer); }}Copy the code
- Now what does the subscribeActual method of ObservableObserveOn do
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} elseWorkerw = scheduler.createWorker(); Source. subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); }} You can see a wrapper around the defined observer and worker ObserveOnObserver(observer <? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) { this.downstream = actual; this.worker = worker; this.delayError = delayError; this.bufferSize = bufferSize; }Copy the code
- So the current Observer structure looks like this
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
ObservableSubscribeOn
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
Copy the code
This process wraps the Observer again,
The structure is shown in the red box
observer.onSubscribe(parent);
The subsequent incoming downstream (ObserveOnObserver) executes onSubscribe(parent), which is the SubscribeOnObserver
@Override
public void onSubscribe(Disposable d) {
if(DisposableHelper.validate(this.upstream, d)) { this.upstream = d; 支那ifCondition does not execute **if (d instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
QueueDisposable<T> qd = (QueueDisposable<T>) d;
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
if (m == QueueDisposable.SYNC) {
sourceMode = m;
queue = qd;
done = true;
downstream.onSubscribe(this);
schedule();
return;
}
if (m == QueueDisposable.ASYNC) {
sourceMode = m;
queue = qd;
downstream.onSubscribe(this);
return; } } queue = new SpscLinkedArrayQueue<T>(bufferSize); Execute this downstream, this downstream is also an ObserveOnObserver, downstream.onsubscribe (this); }}Copy the code
If you look at the comment above, you will call the onSubscribe method of ObserveOnObserver again. If (d instanceof QueueDisposable) is satisfied because d is an ObserveOnObserver, ObserveOnObserver inherits BasicIntQueueDisposable, so this if condition is satisfied
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>`
Copy the code
Because the result of this execution is not the same, so send the code again
@Override
public void onSubscribe(Disposable d) {
if(DisposableHelper.validate(this.upstream, d)) { this.upstream = d; 1. Enter if the conditions are metifstatementsif (d instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
QueueDisposable<T> qd = (QueueDisposable<T>) d;
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
if (m == QueueDisposable.SYNC) {
sourceMode = m;
queue = qd;
done = true;
downstream.onSubscribe(this);
schedule();
return; } 3. Execute the onSubscribe method of the downstream downstream after the calculationif (m == QueueDisposable.ASYNC) {
sourceMode = m;
queue = qd;
downstream.onSubscribe(this);
return; } } queue = new SpscLinkedArrayQueue<T>(bufferSize); downstream.onSubscribe(this); }} 2. In this Observer outputFused is set totrue@override public int requestFusion(int mode) {if((mode & ASYNC) ! = 0) { outputFused =true;
return ASYNC;
}
return NONE;
}
Copy the code
Look at note 3 above, then this downstream is actually an Observer written in our program, that is
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "Start using subscribe connections." + Thread.currentThread().getName());
}
Copy the code
Therefore, the conclusion here is that the Observer onSubscire is not subject to thread switching and is only relevant to the thread at which the call was made, since the thread is still in the county at which the call was made. Parent. SetDisposable (Scheduler. ScheduleDirect (new SubscribeTask(parent))); You can guess from the name of the function that this is where the thread switches. Let’s follow up on the code.
1). SubscribeTask implements the Runnable interface and saves the Observer
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() { source.subscribe(parent); }}Copy the code
2)scheduler.scheduleDirec(xx)
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
returnscheduleDirect(run, 0L, TimeUnit.NANOSECONDS); } @NonNull public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { 1. EventLoopWorker Final Worker w = createWorker(); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); DisposeTask = new DisposeTask(decoratedRun, w); 3. Execute worker schedule method w.schedule(task, delay, unit);return task;
}
Copy the code
Follow up on EventLoopWorker.java
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don schedule, we are unsubscribed
returnEmptyDisposable.INSTANCE; } where threadWorker is NewThreadWorkerreturnthreadWorker.scheduleActual(action, delayTime, unit, tasks); } where threadWorker is NewThreadWorkerCopy the code
Follow up NewThreadWorke. Java
/** * Wraps the given runnable into a ScheduledRunnable and schedules it * on the underlying ScheduledExecutorService. * <p>If the schedule has been rejected, the ScheduledRunnable.wasScheduled willreturn
* false.
* @param run the runnable instance
* @param delayTime the time to delay the execution
* @param unit the time unit
* @param parent the optional tracker parent to add the created ScheduledRunnable instance to before it gets scheduled
* @return the ScheduledRunnable instance
*/
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if(parent ! = null) {if(! parent.add(sr)) {returnsr; } } Future<? > f; try {if(delayTime <= 0) {ScheduledExecutorService executor; f = executor.submit((Callable<Object>)sr); }else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if(parent ! = null) { parent.remove(sr); } RxJavaPlugins.onError(ex); }return sr;
}
Copy the code
- So this is where the thread switches, and ScheduledExecutorService executes the task, and who is that task? It just came all the way
SubsribeTask
Executes its run method, which issource.subscribe(parent);
To sum up, ObservableSubscribeOn’s approach continues wrapping the downstream Observer, then switches threads, continuing the subscription in the new thread, and passing the wrapped Observer upstream.
- The next source, ObservableSubscribeOn, will continue, and the thread switches to schedulers.newthread (). Follow up directly in the Run method of the SubscribeTask.
- The ObservableSubscribeOn source is the upstream of the ObservableSubscribeOn, which is the source of the ObservableSubscribeOn
ObservableCreate
the
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source; } @Override protected void subscribeActual(Observer<? Createcreateemitter <T> parent = new CreateCreateEmitter <T>(observer); observer.onSubscribe(parent); 2. Execute the true subscribe method source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } } Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() { @Override 3. Public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {log.d (ObservableEmitter<Integer> emitter)"Current thread name"+ Thread.currentThread().getName()); emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); }});Copy the code
To sum up: The thread where the final observed is located is executed in the thread set in the first SubscribeOn. Therefore, although SubscribeOn threads are switched for many times, each time it takes effect, the first thread is the one that sends the data at last.
Take onNext as an example to analyze the ObserveOn process
- The CreateEmitter onNext
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if(! IsDisposed () {1. This observer is the last onesourceThe corresponding static inner class SubscribeOnObserver Observer.onNext (t); }}Copy the code
- The SubscribeOnObserver onNext
@override public void onNext(T T) {this downstream is the next SubscribeOnObserver downstream.onnext (T); }Copy the code
- According to the above comment, the implementation of 2 will be repeated, which is omitted here, so after the implementation, who is this downstream? It is ObserveOnObserver, so the onNext method of ObserveOnObserver will be implemented.
@Override
public void onNext(T t) {
if (done) {
return; } 1. Store data to queueif (sourceMode ! = QueueDisposable.ASYNC) { queue.offer(t); } 2. Schedule (); } voidschedule() {
if(getAndIncrement() == 0) {increment (); }}Copy the code
- This time a thread switch is performed, and the run method of the current Observer is executed
@Override
public void run() {1. OutputFused is used fortrue, so go to drainFusedif (outputFused) {
drainFused();
} else {
drainNormal();
}
}
void drainFused() {
int missed = 1;
for (;;) {
if (disposed) {
return;
}
boolean d = done;
Throwable ex = error;
if(! delayError && d && ex ! = null) { disposed =true;
downstream.onError(error);
worker.dispose();
return; } 2. Execute the onNext method to the downstream. Who is the downstream? Or ObserveOnObserver downstream. OnNext (null);if (d) {
disposed = true;
ex = error;
if(ex ! = null) { downstream.onError(ex); }else {
downstream.onComplete();
}
worker.dispose();
return;
}
missed = addAndGet(-missed);
if (missed == 0) {
break; }}}Copy the code
- Following comment 2 above, perform the thread switch again, executing the run method
@Override
public void run() {1. The difference in Step 4 is that the outputFused isfalseSo execute the drainNormal() methodif (outputFused) {
drainFused();
} else {
drainNormal();
}
}
void drainNormal() { int missed = 1; final SimpleQueue<T> q = queue; 2. Who is this downstream? Final Observer<? super T> a = downstream;for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
disposed = true;
upstream.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "Start using subscribe connections."+ Thread.currentThread().getName()); } @override public void onNext(Integer value) {log. d(TAG, TAG, TAG) {Override public void onNext(Integer value) {Log."Observer thread is in" + Thread.currentThread().getName());
Log.d(TAG, "Respond to the Next event" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "Respond to an Error event");
}
@Override
public void onComplete() {
Log.d(TAG, "Respond to the Complete Event"); }};Copy the code
To sum up, thoughobserseOn
The thread can be specified more than once, but the observer thread declared in the final program is the lastobserveOn
Is executed in the thread.
To sum up:
- Each operator from the top down wraps around the upstream ObServerable, generating a new Observable
- After subscribe is executed, it is equivalent to recalling the subscription process of the upstream Observable, which is a dismantling of the upstream Observable and a packaging of the downstream Observer. During this process, the thread in subscribeOn is switched. But the ultimate Observable is in the thread that switches the first time
- Observable implements the onNext method by dismantling the Observer and switching threads, and the Observer executes on the thread specified by the last observeOn