Rxjava 2.x source code series – Basic framework analysis
Rxjava 2.x Source code series – Thread Switching (1)
Rxjava 2.x Source code series – Thread Switching (2)
Rxjava 2.x source code series – Transform operator Map (top)
preface
In our last post in the Rxjava source code series – Infrastructure Analysis, we analyzed the infrastructure of Rxjava.
Observables and observers subscribe through the Subscribe () method, which notifies the Observer of events and calls back to Observer methods when needed.
A simple flow chart describes it as follows:
Observable#subscribeOn
In Android, we know that the default execution is on the main thread, so how does Rxjava implement thread switching?
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("1");
emitter.onNext("2");
emitter.onNext("3");
emitter.onComplete();
}
})
.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("TAG"."onSubscribe(): ");
}
@Override
public void onNext(String s) {
Log.e("TAG"."onNext(): " + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.e("TAG"."onComplete(): "); }});Copy the code
So let’s look at the subscribeOn method first, and you can see that
@CheckReturnValue @SchedulerSupport(SchedulerSupport.CUSTOM) public final Observable<T> subscribeOn(Scheduler scheduler) {/ / scheduler to empty ObjectHelper. RequireNonNull (scheduler,"scheduler is null"); // Wrap scheduler with ObservableSubscribeOnreturn RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
Copy the code
As we know from the last blog, when we call Observable. subscibe(Observable), the subscribActual method of the specific Observable instance will be eventually called. The observables subscribeon is an example of an observable.
Next, let’s take a look at ObservableSubscribeOn this class, you can see inheriting AbstractObservableWithUpstream, While AbstractObservableWithUpstream inherit observables, realize HasUpstreamObservableSource this interface.
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
---
}
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
/** The source consumable Observable. */
protected final ObservableSource<T> source;
/**
* Constructs the ObservableSource with the given consumable.
* @param source the consumable Observable
*/
AbstractObservableWithUpstream(ObservableSource<T> source) {
this.source = source;
}
@Override
public final ObservableSource<T> source() {
return source;
}
}
public interface HasUpstreamObservableSource<T> {
/**
* Returns the upstream source of this Observable.
* <p>Allows discovering the chain of observables.
* @return the source ObservableSource
*/
ObservableSource<T> source(a); }Copy the code
The subscribeActual method of observableSubscribeOn is similar to the subscribeActual method of Observablecate, which is also a subclass of Observable. Just more than ObservableCreate HasUpstreamObservableSource implements an interface, the interface is very interesting, his source is () method returns the type ObservableSource (remember the role of this class? . In other words, ObservableSubscribeOn is an Observable with upstream. He has a key attribute, Source, which represents his upstream.
Let’s look at the implementation of ObservableSubscribeOn
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source); this.scheduler = scheduler; } @Override public void subscribeActual(final Observer<? super T> s) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); s.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); }}Copy the code
Let’s start with its constructor, which has two arguments source, scheduler.
- Source represents an upstream reference and is an instance of an Observable
- Scheduler can create instances of schedulers.newthread () or schedulers.io ()
Here we have an overview of what Scheduler is. Scheduler encapsulates Worker and DisposeTask, which will be discussed in detail below.
Schedulers.newThread()
@NonNull
public static Scheduler newThread() {
return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
}
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
static final class NewThreadTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return NewThreadHolder.DEFAULT;
}
}
static final class NewThreadHolder {
static final Scheduler DEFAULT = new NewThreadScheduler();
}
Copy the code
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
IO = RxJavaPlugins.initIoScheduler(new IOTask());
static final class IOTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return IoHolder.DEFAULT;
}
}
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
Copy the code
Back to the subscribeActual method of ObservableSubscribeOn, I explained how observables and observers implement subscriptions in my last blog post, but I won’t go into details here.
Next, let’s focus on this line of code
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
Copy the code
Let’s start by looking at the SubscribeTask class, which is a non-static inner class of ObservableSubscribeOn. You can see that it is also relatively simple in that it implements the Runnable interface and holds the parent reference.
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() { source.subscribe(parent); }}Copy the code
Then in the run method, the connection is established by source.subscribe(parent). Thus, when our SubscribeTask run method runs on that thread, the corresponding Observer subscribe method runs on that thread.
You might wonder how the SubscribeTask, which does not have the source attribute, gets access to the ObservableSubscribeOn attribute.
We know that in Java, a non-static inner class holds a reference to an external class by default, so it has normal access to the source property of the external ObservableSubscribeOn class.
Next, take a look at the scheduler.scheduleDirect method
@NonNull public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { final Worker w = createWorker(); // Run is null final Runnable decoratedRun = rxJavaplugins.onschedule (run); DisposeTask task = new DisposeTask(decoratedRun, w); w.schedule(task, delay, unit);return task;
}
Copy the code
- First, create a Worker W
- Second, the DisposeTask wraps the decoratedRun
- Step 3: W to schedule tasks
Here we use NewThreadScheduler as an example to see what the Worker is.
public Worker createWorker() {
return new NewThreadWorker(threadFactory);
}
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
volatile boolean disposed;
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
---
}
public static ScheduledExecutorService create(ThreadFactory factory) {
final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
if (PURGE_ENABLED && exec instanceof ScheduledThreadPoolExecutor) {
ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
POOLS.put(e, exec);
}
return exec;
}
Copy the code
As you can see from the above, worker encapsulates executor (thread pool) in worker. If you can see from the above, you can understand the principle of Rxjava thread switching.
In the ObservableSubscribeOn subscribeActual method, SubscribeTask wraps parent SubscribeTask implements Runnable interface and calls source.subscribe(parent) in the run method, so the thread executed by the run method will be determined by the worker. This is how the downstream determines the thread of the upstream Observable to execute.
Let’s take a look at DisposeTask
static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection {
final Runnable decoratedRun;
final Worker w;
Thread runner;
DisposeTask(Runnable decoratedRun, Worker w) {
this.decoratedRun = decoratedRun;
this.w = w;
}
@Override
public void run() {
runner = Thread.currentThread();
try {
decoratedRun.run();
} finally {
dispose();
runner = null;
}
}
@Override
public void dispose() {
if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
((NewThreadWorker)w).shutdown();
} else {
w.dispose();
}
}
@Override
public boolean isDisposed() {
return w.isDisposed();
}
@Override
public Runnable getWrappedRunnable() {
returnthis.decoratedRun; }}}Copy the code
// Set the new Disposable to parent to cancel the subscription, // The Disposable of the original parent cannot represent the latest Disposable. Parent. SetDisposable (Scheduler.scheduleDirect (new SubscribeTask(parent)))Copy the code
DisposeTask implements the Disposable Runnable, SchedulerRunnableIntrospection interface, the Disposable interface is mainly used to unsubscribe relations the Disposable.
Observable#subscribeOn(Scheduler) first valid principle
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.i(TAG, "subscribe: getName=" +Thread.currentThread().getName());
emitter.onNext("1");
emitter.onNext("2");
emitter.onNext("3"); emitter.onComplete(); SubscribeOn (schedulers.io ()).subscribe(new) Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.e("TAG"."onSubscribe(): ");
}
@Override
public void onNext(String s) {
Log.e("TAG"."onNext(): " + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.e("TAG"."onComplete(): "); }});Copy the code
subscribe: getName=RxCachedThreadScheduler-1
If the order of subscribeOn described above is replaced
subscribeOn(Schedulers.computation()).subscribeOn(Schedulers.io())
Copy the code
So it will print out
subscribe: getName=RxComputationThreadPool-1
Why is the first Observable#subscribeOn(Scheduler) valid?
Observable#subscribe(Observer) is an Observable#subscribe(Observer) operation that is placed on a specified thread. When subcribe is called, the process is bottom-up. The Observanle is called by the Observanle below.
So for our first example above, his call flow looks like this: The third Observable invokes Observable#subscribe(Observer) to initiate the subscription, and internally activates the Observable#subscribe(Observer) method of the second Observable. However, a Schedulers.computation() thread is wrapped around the method
The subscription process is then run on the thread. The pseudo-code is shown below
Public class Observable {// The "second" Observablesource;
Observer observer;
public Observable(Observable source, Observer observer) {
this.source = source;
this.observer = observer;
}
public void subscribe(Observer Observer) {
new Thread("computation") {
@Override
public void run() {// The second Observable subscribes source.subscribe(observer); }}}}Copy the code
Further up, the second Observable subscription internally activates the Observable#subscribe(Observer) method of the first Observable, again wrapped in schedulers.io () thread and demonstrated in pseudocode
Public class Observable {// First Observable Observablesource;
Observer observer;
public Observable(Observable source, Observer observer) {
this.source = source;
this.observer = observer;
}
public void subscribe(Observer Observer) {
new Thread("io") {
@Override
public void run() {// First Observable source.subscribe(observer); }}}}Copy the code
After reaching the first Observable, it starts emitting events, and the thread of execution is obviously the IO thread. You can also use Thread pseudocode instead.
new Thread("computation") {
@Override
public void run() {// Second observable. subscribe(Observer) {// New Thread() {// New Thread();"io") {
@Override
public void run() {// The first observable. subscribe(Observer) is a system.out.println ()"OnNext (T)/onError(Throwable)/onComplete() is executed by: + Thread
.currentThread().getName());
}
} .start();
}
} .start();
Copy the code
conclusion
The flow chart is described as follows:
Reference blog:
Friendly RxJava2.x source code parsing (two) thread switching
Our next article will explain to observeOn (AndroidSchedulers. MainThread ()).
Scan, welcome to follow my public account. If you have good articles, you are also welcome to contribute.