In previous articles, we introduced the method summary of RxJava 2 common APIS, the concept of back pressure, and the practical application of RxJava 2 in the project. In this section, we’ll examine the source code for RxJava 2. Here are some links to previous articles, and if you’re interested in using RxJava 2, you can follow them:
- | RxJava reactive programming method in a comprehensive RxJava2 summary on
- | RxJava reactive programming on a comprehensive summary, the RxJava2 method
- RxJava reactive programming using | RxJava Flowable and back pressure
- RxJava reactive programming | RxJava application demonstration, made EventBus RxJava
Let’s take a look at RxJava 2’s main flow, design patterns, and how it implements thread switching from a simple example.
1, RxJava main process source code analysis
Here is a very typical use example of RxJava, in which we execute the business logic in the IO thread, with subsequent processing of the results of the execution in the main thread.
Disposable disposable = Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
// Execute the business logic here
emitter.onNext(new Object());
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
// Do subsequent processing in the main thread}}); disposable.dispose();Copy the code
We divide this program into four stages for analysis: 1). The execution process of calling the create() method; 2.) call subscribeOn (Schedulers. IO ()) and observeOn (AndroidSchedulers. MainThread ()) to realize the process of thread; 3). Subscribe (); 4). Call dispose() method to cancel the subscription process.
Let’s take a look at the first phase.
1.1 Execution of create() and SUBSCRIBE () methods
Here is how the create() method is executed. In the code below, we omit the null detection logic. In the current section, we assume that no logic related to thread switching is specified. That is, the create() call is followed by the subscribe() method.
For the static methods of RxJavaPlugins, such as onAssembly(), let’s leave that out for now. You can think of it as simply returning the value of the passed argument. For example, the create() method below returns an instance of ObservableCreate.
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
New ObservableCreate
(source
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
// Wrap the incoming observer
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
// Call the observer's subscription callback method
observer.onSubscribe(parent);
try {
// Where the subscription is actually executed
source.subscribe(parent);
} catch(Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); }}static final class CreateEmitter<T> extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext ..."));
return;
}
if(! isDisposed()) {// Call the onNext() method of the incoming observerobserver.onNext(t); }}@Override
public void dispose(a) {
// Unsubscribe
DisposableHelper.dispose(this);
}
// ...
}
// ...
}
Copy the code
The above is the execution process of the first stage. We’ve left out some of the code and left out some of the more representative methods. You may not be able to see this part of the code right now, but it will make sense as you read on.
Now let’s look at what happens when the subscribe() method is called.
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe) {
// Wrap the three types of observer callbacks uniformly into the LambdaObserver method
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
subscribe(ls);
return ls;
}
public final void subscribe(Observer<? super T> observer) {
try {
// just return to observer
observer = RxJavaPlugins.onSubscribe(this, observer);
// The subscribeActual() method is called
subscribeActual(observer);
} catch (NullPointerException e) {
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
throw new NullPointerException("Actually not, but can't throw other exceptions due to RS"); }}Copy the code
The above methods are all defined in an Observable, and the difference is only in the object called. So, in order to analyze the process more clearly, we use capital letters for the analysis:
First of all, the overall execution process is,
D = Observalbe.create(S).subscribe(X,Y,Z);
Copy the code
It can be broken down into the following two steps for analysis (the following is a copy of pseudocode, sorted only by call order in time) :
A = Observable.create(S);
D = A.subscribe(X,Y,Z);
Copy the code
Then, calling the SUBSCRIBE () method of A actually calls the Subscribe () method of the Observable (see above). So, following the call, the pseudocode above will look something like this,
A = Observable.create(S)
O = LambdaObserver(X,Y,Z)
D = A.subscribe(O)
A.subscribeActual(O)
Copy the code
So we can know that when the subscribe() method is called, the subscribeActual() method of A is actually called and B is passed in as A parameter. B is a LambdaObserver, consisting of three arguments passed in when we call SUBSCRIBE (). What about A? Going back to the create() code, we know that this is an instance of ObservableCreate. This calls its subscribeActual() method. In alphabetical form, the method will look something like this,
@Override
protected void subscribeActual(O) {
P = new CreateEmitter<T>(O);
O.onSubscribe(P);
S.subscribe(P);
}
Copy the code
The S here is passed in by the constructor of ObservableCreate, which is the object we passed in the create() method. First, O is passed to the CreateEmitter instance as a constructor parameter. Then, the onSubscribe() method of O is called back and P is passed out. This is one of the common RxJava callback methods. In step 3, we call the SUBSCRIBE () method of S and pass P out. So, when we call the following line of code as in the example code,
emitter.onNext(new Object());
Copy the code
It’s actually calling a method of P here. So, let’s look at P’s onNext() method,
@Override
public void onNext(T t) {
O.onNext(t);
}
Copy the code
This is done by calling O’s onNext() method. So, at the end of the day, we’re actually calling back the Consumer method we passed in the subscribe() method. This passes the value that we’re sending to our observation method via a callback.
1.2 dispose()
Method execution process
The main flow of the create() and subscribe() methods was analyzed above. What about the Dispose () method?
Following the code given above, it is defined as follows. That is, dispose() method to DisposableHelper to finally unsubscribe.
@Override
public void dispose(a) {
DisposableHelper.dispose(this);
}
Copy the code
DisposableHelper’s Dispose () method is defined as follows. When dispose(), this is CreateEmitter. And it inherits AtomicReference
.
public static boolean dispose(AtomicReference<Disposable> field) {
Disposable current = field.get();
Disposable d = DISPOSED;
if(current ! = d) { current = field.getAndSet(d);if(current ! = d) {if(current ! =null) {
current.dispose();
}
return true; }}return false;
}
Copy the code
An AtomicReference, no stranger to anyone, is a reference to an atomic type. Now formally unsubscribe by assigning a reference to the atomic type – set to DISPOSED by an atomic operation.
1.3 Summary of RxJava execution process
Above, we summarized the execution process of RxJava Observable from Create () to Subscribe () to Dispose (). We can sort the process out by relying on our own logic, but it’s unwieldy. In addition to understanding the process, I think we should also analyze the design ideas it uses.
At the beginning, WHEN we analyzed the above process, I was also in a fog. But when I continued to analyze subscribeOn(), I suddenly realized that its overall design uses the same design mode as the flow in Java. Before we actually analyze subscribeOn(), let’s look at its code,
public final Observable<T> subscribeOn(Scheduler scheduler) {
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T.T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
// ...
}
Copy the code
Compare the subscribeOn() and create() methods, and you can easily see that their logic is almost identical. Both pass in an ObservableSource, wrap it, get a parent in the subscribeActual() method, call onSubscribe() and do the following… Like Java’s IO architecture, it uses decorator design patterns.
In the Java IO architecture, we often see the following code.
InputStream is = new FileInputStream(fileToCopy);
BufferedInputStream bis = new BufferedInputStream(is, 15 * 1024);
DataInputStream dis = new DataInputStream(bis);
Copy the code
The FileInputStream here is the node stream that opens the input stream on the disk. Subsequent BufferedInputStream and DataInputStream are used to decorate node streams. Each of them only needs to perform its own functions, the former mainly for caching to improve the read rate, and the latter for converting the resulting flow to the data type we need. If we have other requirements, we just need to implement a custom decorator based on this chain.
Think back to the actual development process and see if we often use the chain to call a long string, with each link in the middle implementing its own function, such as transformation, filtering, statistics, etc. With decorator mode, each link of the chain only needs to implement its own function, and users can add links to the chain according to their own needs. So, like transformation, filtering, statistics, and so on, the responsibility of each class becomes single, decoupled from the entire invocation chain. Really have to admire RxJava this design!
Knowing that RxJava as a whole uses the decorator design pattern makes it much easier to understand some of the other features. Following decorator design patterns, RxJava’s wrapping process and the callback after calling the SUBSCRIBE () method will look like this:
So, the reason RxJava is notorious for its long call stack is that when we use decorators stacked on top of each other, the entire call stack becomes very long.
Also, a word about the so-called thread switching problem. Suppose we use the subscribeOn() method at the four points in the above call process and specify the processing thread as A; Call the same method at 5, but on thread B, and the previous 1 and 3 procedures will be wrapped into an object and executed on thread 4. 4 is then wrapped as an object on the same thread as 5. Therefore, if we get the current thread in 2, we must get the thread in 4. That is, when two subscribeOn() are used, only the first reason is usually considered valid. Well, they both work, except A is executed in B, and 1 and 3 are executed in A. So, the thread switch to arcana depends on this wrapped relationship. When a task is finished in one thread, it will naturally switch to another thread. (subscribeOn() and observeOn() implement threads slightly differently, as detailed below.)
1.4 Execution process of RxJava thread switch
As mentioned above, subscribeOn() and observeOn() implement thread switching differently. Therefore, in the following article, we divide into two cases to analyze them respectively.
When the subscribeOn() method is called, the incoming Observable is further decorated as an ObservableSubscribeOn object. According to our analysis above, when the subscribe() method is finally called, it follows the chain of decorators until it reaches the subscribeActual() method of ObservableSubscribeOn. Here is the definition of this method,
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
Copy the code
In addition to the analysis above, there is an additional scheduler, which is the thread we specify when we call the subscribeOn() method. It calls its scheduleDirect() method directly to add the task to the thread pool for execution. You pass in the SubscribeTask object, which implements the Runnable interface and calls the parent’s Subscribe () method in the override run() method. Thus, it can be executed in any thread pool, and when executed calls the subscribe() method of the incoming Observable to allow the downstream task to be executed in that thread pool.
The following is a flowchart for asynchronous task execution in RxJava,
The Schduler passed in here is a top-level class, and when we call schedulers.io (), we get an instance of its implementation class, such as IOScheduler. Scheduler’s createWorker() template method is used to retrieve a Worker. This class is used to manage tasks in RxJava. It further calls its own schedule() method to further schedule the execution of the task. The Worker in the figure is also an abstract class, and the NewThreadWorker used above is an implementation of it. NewThreadWorker maintains a thread pool, and when its scheduler() method is called, it further places the task into the pool for execution. Therefore, our asynchronous task is executed in this thread pool.
Then, let’s look at how the observeOn() method performs task scheduling.
When we call the observeOn() method, the task is wrapped as an instance of ObservableObserveOn. Again, let’s look at its subscribeActual() method,
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(newObserveOnObserver<T>(observer, w, delayError, bufferSize)); }}Copy the code
It calls Scheduler’s template method directly to get the Worker, and then wraps the Worker in ObserveOnObserver along with the incoming Observer. It will be passed up to ObservableCreate, and its onNext() and other methods will be triggered by the top-level class. Next, let’s look at the ObserveOnObserver definition. Again, we will only use onNext() as an example.
@Override
public void onNext(T t) {
if (done) {
return;
}
if(sourceMode ! = QueueDisposable.ASYNC) { queue.offer(t); } schedule(); }void schedule(a) {
if (getAndIncrement() == 0) {
worker.schedule(this); }}Copy the code
Therefore, it can be seen that the observeOn() method is also implemented by putting tasks into a Worker, but the specific thread will depend on the specific implementation of Scheduler and Worker.
In Android, putting tasks into the main thread for execution is done by sending messages to the main thread Handler. If subscribeOn() is explained as follows, when thread A starts thread B to execute the task, then B will naturally arrive at A after executing the task. So why do you need to send messages to the main thread in Android? We use the following diagram to illustrate.
SubscribeOn () is an upward callback process. When thread A starts thread B to execute tasks, then B will automatically arrive at A after execution, and there is no problem. But observeOn() is a down-calling procedure, and as you can see from the code above, calling onNext() directly from the thread pool executes down the reverse route of the callback, so all logic after observeOn() executes in the thread it specifies.
2, summarize
In this article, we summarize the source code for RxJava 2. Although RxJava is very powerful, its core implementation relies on only two design patterns, one observer pattern and the other decorator pattern. It uses a Java-like flow design, where each decorator is responsible for its own kind of task, compounding the principle of single responsibility; Decorators collaborate with each other to accomplish complex functions. From the above source code analysis process, we can also see that RxJava’s disadvantages are also very obvious, a large number of custom classes, in the completion of a function between the decorator is constantly wrapped, resulting in a very long stack of calls. As for thread switching, it relies on its own decorator mode, because a decorator determines which threads its upstream Observables execute in. When two decorators are on different threads, execution from one thread to the other completes the thread switching process.
Above is RxJava source analysis, if you have any questions, welcome to the comment area exchange 🙂