This article is mainly to RxJava2 subscription process source analysis, first say I use RxJava and RxAndroid version, version as follows:
implementation 'the IO. Reactivex. Rxjava2: rxjava: 2.2.6'
implementation 'the IO. Reactivex. Rxjava2: rxandroid: 2.1.1'
Copy the code
Let’s write some sample code, which I’ll skip Lambda and chain calls for simplicity, as follows:
// Create the observed
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) {
Log.i("TanJiaJun"."subscribe");
emitter.onNext("Tan");
emitter.onNext("Jia");
emitter.onNext("Jun"); emitter.onComplete(); }});// Create an observer
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i("TanJiaJun"."onSubscribe");
}
@Override
public void onNext(String s) {
Log.i("TanJiaJun"."onNext:" + s);
}
@Override
public void onError(Throwable e) {
Log.i("TanJiaJun"."onError");
}
@Override
public void onComplete(a) {
Log.i("TanJiaJun"."onComplete"); }};/ / subscribe
observable.subscribe(observer);
Copy the code
Divided into three steps:
- Create an Observable.
- Create an Observer.
- Call the observed’s SUBSCRIBE method, pass in the observer, associate the two and subscribe.
Source code analysis
We start with the subscribe method, which looks like this:
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
// Check whether the observer is empty
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
// Call the subscribeActual method of the subclass
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
thrownpe; }}Copy the code
See RxJavaPlugins onSubscribe method, the code is as follows:
@SuppressWarnings({ "rawtypes"."unchecked" })
@NonNull
public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
BiFunction<? super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe;
if(f ! =null) {
return apply(f, source, observer);
}
return observer;
}
Copy the code
This method will call the associated hook function, and we can see that it will determine whether the onObservableSubscribe is empty. This variable is assigned by the setOnObservableSubscribe method.
@SuppressWarnings("rawtypes")
public static void setOnObservableSubscribe(
@Nullable BiFunction<? super Observable, ? super Observer, ? extends Observer> onObservableSubscribe) {
if (lockdown) {
throw new IllegalStateException("Plugins can't be changed anymore");
}
RxJavaPlugins.onObservableSubscribe = onObservableSubscribe;
}
Copy the code
However, we did not call this method, so this is empty and returns observer directly.
Moving on, subscribeActual is an important method that all subclasses of Observable implement as an interface, as we’ll see when we create the observed.
We call the observable. create method as follows:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
Copy the code
The rxJavaplugins.onAssembly method is also a hook function, as follows:
@SuppressWarnings({ "rawtypes"."unchecked" })
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if(f ! =null) {
return apply(f, source);
}
return source;
}
Copy the code
It determines whether the onObservableAssembly variable, which is assigned by the setOnObservableAssembly method, is empty:
@SuppressWarnings("rawtypes")
public static void setOnObservableAssembly(@Nullable Function<? super Observable, ? extends Observable> onObservableAssembly) {
if (lockdown) {
throw new IllegalStateException("Plugins can't be changed anymore");
}
RxJavaPlugins.onObservableAssembly = onObservableAssembly;
}
Copy the code
However, we didn’t call this method, so let’s look directly at the ObservableCreate object we created. I’ve commented out the ObservableCreate object, and the code looks like this:
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
// Source is the upstream Observable in our sample code.
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
// Create CreateEmitter, pass it to the downstream Observer.
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
// Call the onSubscribe method of the downstream Observer and pass in the CreateEmitter object
observer.onSubscribe(parent);
try {
// Call subscribe to the upstream Observable and pass CreateEmitter to it
source.subscribe(parent);
// Execute the onSubscribe method of the downstream Observer and subscribe method of the upstream Observable
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
// Launch an error eventparent.onError(ex); }}// This class inherits AtomicReference and can implement atomic operations
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
// Pass in the downstream Observer
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
// In rxJava2.x, the onNext method cannot pass null, otherwise a null pointer exception is thrown
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
When isDisposed is false, call downstream Observe's onNext method, and pass in the corresponding object
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
// In rxJava2.x, the onError method cannot string null, otherwise it throws a null pointer exception
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
When the isDisposed method returns false, call the downstream observer onError method, pass in the Throwable object, and call the Dispose method
if(! isDisposed()) {try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
@Override
public void onComplete(a) {
// When the isDispoesed method returns false, the onComplete method of the downstream Observer is called, followed by the Dispose method
if(! isDisposed()) {try {
observer.onComplete();
} finally{ dispose(); }}}@Override
public void setDisposable(Disposable d) {
DisposableHelper.set(this, d);
}
@Override
public void setCancellable(Cancellable c) {
setDisposable(new CancellableDisposable(c));
}
@Override
public ObservableEmitter<T> serialize(a) {
return new SerializedEmitter<T>(this);
}
@Override
public void dispose(a) {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed(a) {
return DisposableHelper.isDisposed(get());
}
@Override
public String toString(a) {
return String.format("%s{%s}", getClass().getSimpleName(), super.toString()); }}// Omit some code
}
Copy the code
ObservableOnSubscribe is an ObservableOnSubscribe method with an ObservableEmitter parameter.
public interface ObservableOnSubscribe<T> {
void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}
Copy the code
Our sample code implements this method as follows:
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) {
Log.i("TanJiaJun"."subscribe");
emitter.onNext("Tan");
emitter.onNext("Jia");
emitter.onNext("Jun"); emitter.onComplete(); }});Copy the code
ObservableEmitter onNext and ObservableEmitter onComplete are called. ObservableEmitter is CreateEmitter.
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
/ / the parent is CreateEmitter
source.subscribe(parent);
} catch(Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); }}Copy the code
Calling the onNext and onComplete methods is actually calling the onNext and onComplete methods of the downstream Observer as follows:
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if(! isDisposed()) {// An observer is a downstream observerobserver.onNext(t); }}Copy the code
@Override
public void onComplete(a) {
if(! isDisposed()) {try {
// An observer is a downstream observer
observer.onComplete();
} finally{ dispose(); }}}Copy the code
This calls the methods in our sample code, which looks like this:
@Override
public void onNext(String s) {
Log.i("TanJiaJun",s);
}
Copy the code
@Override
public void onComplete(a) {
Log.i("TanJiaJun"."onComplete");
}
Copy the code
To summarize, the process is as follows:
- Subscribe is called from the upstream Observable and passed to the downstream Observer.
- Subscribe implements the subscribeActual method of ObservableCreate, a subclass of Observable, and passes it to the downstream Observer.
- In subscribeActual method, onSubscribe method of downstream Observer and subscribe method of ObservableOnSubscribe will be executed successively, thus completing the whole subscription process.
- If we emit events, such as the onNext and onComplete methods called ObservableEmitter in the example code, the onNext and onComplete methods of the downstream Observer will execute.
My GitHub: TanJiaJunBeyond
Common Android Framework: Common Android framework
My nuggets: Tan Jiajun
My simple book: Tan Jiajun
My CSDN: Tan Jiajun