Series of articles:
- Rxjava2.x source code parsing (a) basic subscription process
- Friendly RxJava2.x source code parsing (two) thread switching
- Rxjava2.x friendly source analysis (three) zip source analysis
Rxjava2.x source code parsing (a) basic subscription process
This article is based on RxJava 2.1.3
- preface
- The sample code
- Source code parsing of subscription process
- Subscribe to the process
- Observable# subscribe (Observer) process
- Observer# onSubscribe (the Disposable) process
- Observer# onNext (T) process
preface
The sample code
Observable .create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("1"); emitter.onNext("2"); emitter.onNext("3"); emitter.onComplete(); } }) .subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.e("TAG", "onSubscribe(): "); } @Override public void onNext(String s) { Log.e("TAG", "onNext(): " + s); } @Override public void onError(Throwable e) { } @Override public void onComplete() { Log.e("TAG", "onComplete(): "); }});Copy the code
Output result:
E/TAG: onSubscribe():
E/TAG: onNext(): 1
E/TAG: onNext(): 2
E/TAG: onNext(): 3
E/TAG: onComplete():
Copy the code
Subscription process parsing
We know that the subscribe() method is the connection point between an Observable and an Observer, so we can see that the subscribe(Observer Observer) method is an Observable. An Observer is passed in, so we need to figure out what an Observable is and what an Observer is. By looking at the code above, we can see that an Observer is new, so we just need to know what an Observable is. An Observable is an Observable created by calling ObservableOnSubscribe. Create (ObservableOnSubscribe)
Observable, Observer, subscribe(Observer) Observable, Observer (Observer
@Override public final void subscribe(Observer<? Super T> observer) {// omit other source code subscribeActual(observer); // omit other source code}Copy the code
After omitting the non-critical source code, we see that it does just one thing: call Observable#subscribeActual(Observer), which in Observable is an abstract method:
protected abstract void subscribeActual(Observer<? super T> observer);
Copy the code
That means we need to go to its subclasses, and we need to look at its subscribeActual(Observer) method, so we need to start with create(ObservableOnSubscribe), See how it converts an ObservableOnSubscribe object into an Observable
Public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {return new ObservableCreate<T>(source); }Copy the code
Similarly, after deleting non-critical source code, we are left with this line of code, which means that we need to find the implementation of subscribeActual(Observer) from the ObservableCreate class. Here WE need to mention two points
-
ObservableCreate is a subclass of Observable
-
Our custom ObservableOnSubscribe is passed in as a field named Source. In fact, in the subclass of Observables, they all have a field named source, which refers to the upstream Observable (ObservableOnSubscribe, but the achievement is Observable).
ObservableCreate#subscribeActual()
@Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); Observer#onSubscribe(Disposable) observer.onSubscribe(parent); Try {// Emit the event source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); }}Copy the code
Line 5 calls Observer#onSubscribe(Disposable), so we know that Observer#onSubscribe(Disposable) is called first, and the Observable hasn’t even started shooting events yet! The next step is to call source.subscribe(ObservableEmitter), which is left to the developer to implement, as shown in the sample code
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("1");
emitter.onNext("2");
emitter.onNext("3");
emitter.onComplete();
}
Copy the code
In this code, we call onNext() of CreateEmitter, so we need to poke into the CreateEmitter class to see how onNext(T) works. The source code is as follows:
@override public void onNext(T T) {if (! isDisposed()) { observer.onNext(t); }}Copy the code
Clearly, when the current object is not DISPOSED, the onNext(T) method of the downstream Observer will be called, and the onNext(T) method of the downstream Observer is what we wrote in the example code above
public void onNext(String s) {
Log.e("TAG", "onNext(): ");
}
Copy the code
At this point, the basic subscription process is clear. We start with Observable#subscribe(Observer), which passes the Observer to the Observable, which in turn activates the Observer onNext(T) method in the onNext(T) method. In fact, the operators we use in RxJava create an Observable and Observer internally. Although Observable#subscribeActual(Observer) has its own specific implementation, most of them do two operations: one is to encapsulate the Observer sent from “downstream” according to the requirements; The other is to make the “upstream” Observable subscribe() to the Observer.
Subscribe to the process
The basic rxJavA2.x subscription process starts with Observable#subscribe(Observer). This method triggers the Observable#subscribeActual(Observer) method of the “upstream” Observable, In this “upstream” Observable, the Observable#subscribeActual(Observer) method of the “upstream” Observable is triggered. We can use the following source code as an example:
Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("1");
}
})
.flatMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(String s) throws Exception {
return Observable.just(s);
}
})
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
Copy the code
In addition, there is a chart which clearly identifies the terms “first Observable” and “second Observable” mentioned later:
The following diagram shows the entire subscription process
Observable# subscribe (Observer) process
Observable#subscribe(Observer)
Observable#subscribe(Observer) :
Observable#subscribe(Observer) causes the subscribe(Observer) method of the upstream Observable to be called:
Subscribe (Observer); subscribe(Observer); subscribe(Observer);
Observable#subscribe(Observer) calls Observable#subscribe(Observer). Observable#subscribe(Observer) is an abstract method overridden by subclasses, so it shows Observable polymorphism. And the key point of how to activate subscribe(Observer)/subscribeActual(Observer) method of the upstream Observable is also here. The Observable#subscribeActual(Observer) method is an abstract method, but all subclasses of the Observable#subscribeActual(Observer) method include source.subscribe(Observer), The source is the upstream ObservableSource (actually ObservableSource, but we’ll just call it ObservableSource because we’re more familiar with this object, Observable is an implementation of the ObservableSource interface. Therefore, it can be understood that in the subscribeActual(Observer) method of every Observable, it calls the subscribe(Observer)/subscribeActual(Observer) method upstream. Subscribe (Observer)/subscribeActual(Observer) of the first Observable.
Observer# onSubscribe (the Disposable) process
The subscription chain is sorted out, but the launch event process is not out yet. Let’s move on
When the Top Observable reaches the subscribeActual(Observer), it can’t go any further and is ready to do something (ready to emit events). Here we use the Observable in the sample code as an example
@Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); }}Copy the code
It first encapsulates a Disposable and then calls Observer#onSubscribe(Disposable) to pass the Disposable argument to the next Observer.
In the onSubscribe(Disposable) of the Observer of the next layer, this method does some operations (judgment, encapsulation, etc.) on the Disposable of the previous layer. We then wrap a Disposable and pass it to Server #onSubscribe(Disposable).
The Observer is our custom Observer
Observer# onNext (T) process
At the end of the Observer#onSubscribe(Disposable) process, line 7 Observeable. Subscribe(Observer), which is essentially —
new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("1"); }})Copy the code
Ps: For convenience, only the onNext() execution flow is analyzed here.
Inside ObservableEmitter#onNext(T) actually triggers the Observer onNext(T) method —
Triggering further down is the lowest level Observer we have customized —
In the example code, the top Observable fires the ObservableEmitter#onNext(T) method, and inside that method fires the onNext(T) method of the “downstream” Observer, Within this method, the onNext(T) method of the “downstream downstream” Observer is triggered, all the way to the lowest Observer, our custom Observer
At this point, a subscription process is complete.