This article is mainly to RxJava2 subscription process source analysis, first say I use RxJava and RxAndroid version, version as follows:

implementation 'the IO. Reactivex. Rxjava2: rxjava: 2.2.6'
implementation 'the IO. Reactivex. Rxjava2: rxandroid: 2.1.1'
Copy the code

Let’s write some sample code, which I’ll skip Lambda and chain calls for simplicity, as follows:

// Create the observed
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) {
        Log.i("TanJiaJun"."subscribe");

        emitter.onNext("Tan");
        emitter.onNext("Jia");
        emitter.onNext("Jun"); emitter.onComplete(); }});// Create an observer
Observer<String> observer = new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i("TanJiaJun"."onSubscribe");
    }

    @Override
    public void onNext(String s) {
        Log.i("TanJiaJun"."onNext:" + s);
    }

    @Override
    public void onError(Throwable e) {
        Log.i("TanJiaJun"."onError");
    }

    @Override
    public void onComplete(a) {
        Log.i("TanJiaJun"."onComplete"); }};/ / subscribe
observable.subscribe(observer);
Copy the code

Divided into three steps:

  1. Create an Observable.
  2. Create an Observer.
  3. Call the observed’s SUBSCRIBE method, pass in the observer, associate the two and subscribe.

Source code analysis

We start with the subscribe method, which looks like this:

@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
    // Check whether the observer is empty
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);

        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

        // Call the subscribeActual method of the subclass
        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        // can't call onError because no way to know if a Disposable has been set or not
        // can't call onSubscribe because the call might have set a Subscription already
        RxJavaPlugins.onError(e);

        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        thrownpe; }}Copy the code

See RxJavaPlugins onSubscribe method, the code is as follows:

@SuppressWarnings({ "rawtypes"."unchecked" })
@NonNull
public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
    BiFunction<? super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe;
    if(f ! =null) {
        return apply(f, source, observer);
    }
    return observer;
}
Copy the code

This method will call the associated hook function, and we can see that it will determine whether the onObservableSubscribe is empty. This variable is assigned by the setOnObservableSubscribe method.

@SuppressWarnings("rawtypes")
public static void setOnObservableSubscribe(
        @Nullable BiFunction<? super Observable, ? super Observer, ? extends Observer> onObservableSubscribe) {
    if (lockdown) {
        throw new IllegalStateException("Plugins can't be changed anymore");
    }
    RxJavaPlugins.onObservableSubscribe = onObservableSubscribe;
}
Copy the code

However, we did not call this method, so this is empty and returns observer directly.

Moving on, subscribeActual is an important method that all subclasses of Observable implement as an interface, as we’ll see when we create the observed.

We call the observable. create method as follows:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
Copy the code

The rxJavaplugins.onAssembly method is also a hook function, as follows:

@SuppressWarnings({ "rawtypes"."unchecked" })
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
    Function<? super Observable, ? extends Observable> f = onObservableAssembly;
    if(f ! =null) {
        return apply(f, source);
    }
    return source;
}
Copy the code

It determines whether the onObservableAssembly variable, which is assigned by the setOnObservableAssembly method, is empty:

@SuppressWarnings("rawtypes")
public static void setOnObservableAssembly(@Nullable Function<? super Observable, ? extends Observable> onObservableAssembly) {
    if (lockdown) {
        throw new IllegalStateException("Plugins can't be changed anymore");
    }
    RxJavaPlugins.onObservableAssembly = onObservableAssembly;
}
Copy the code

However, we didn’t call this method, so let’s look directly at the ObservableCreate object we created. I’ve commented out the ObservableCreate object, and the code looks like this:

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    // Source is the upstream Observable in our sample code.
    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        // Create CreateEmitter, pass it to the downstream Observer.
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        // Call the onSubscribe method of the downstream Observer and pass in the CreateEmitter object
        observer.onSubscribe(parent);

        try {
            // Call subscribe to the upstream Observable and pass CreateEmitter to it
            source.subscribe(parent);
            // Execute the onSubscribe method of the downstream Observer and subscribe method of the upstream Observable
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            // Launch an error eventparent.onError(ex); }}// This class inherits AtomicReference and can implement atomic operations
    static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {


        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;

        // Pass in the downstream Observer
        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            // In rxJava2.x, the onNext method cannot pass null, otherwise a null pointer exception is thrown
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            When isDisposed is false, call downstream Observe's onNext method, and pass in the corresponding object
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            // In rxJava2.x, the onError method cannot string null, otherwise it throws a null pointer exception
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            When the isDisposed method returns false, call the downstream observer onError method, pass in the Throwable object, and call the Dispose method
            if(! isDisposed()) {try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
                return true;
            }
            return false;
        }

        @Override
        public void onComplete(a) {
            // When the isDispoesed method returns false, the onComplete method of the downstream Observer is called, followed by the Dispose method
            if(! isDisposed()) {try {
                    observer.onComplete();
                } finally{ dispose(); }}}@Override
        public void setDisposable(Disposable d) {
            DisposableHelper.set(this, d);
        }

        @Override
        public void setCancellable(Cancellable c) {
            setDisposable(new CancellableDisposable(c));
        }

        @Override
        public ObservableEmitter<T> serialize(a) {
            return new SerializedEmitter<T>(this);
        }

        @Override
        public void dispose(a) {
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed(a) {
            return DisposableHelper.isDisposed(get());
        }

        @Override
        public String toString(a) {
            return String.format("%s{%s}", getClass().getSimpleName(), super.toString()); }}// Omit some code
}
Copy the code

ObservableOnSubscribe is an ObservableOnSubscribe method with an ObservableEmitter parameter.

public interface ObservableOnSubscribe<T> {

    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;

}
Copy the code

Our sample code implements this method as follows:

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) {
        Log.i("TanJiaJun"."subscribe");

        emitter.onNext("Tan");
        emitter.onNext("Jia");
        emitter.onNext("Jun"); emitter.onComplete(); }});Copy the code

ObservableEmitter onNext and ObservableEmitter onComplete are called. ObservableEmitter is CreateEmitter.

@Override
protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);

    try {
        / / the parent is CreateEmitter
        source.subscribe(parent);
    } catch(Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); }}Copy the code

Calling the onNext and onComplete methods is actually calling the onNext and onComplete methods of the downstream Observer as follows:

@Override
public void onNext(T t) {
    if (t == null) {
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
        return;
    }
    if(! isDisposed()) {// An observer is a downstream observerobserver.onNext(t); }}Copy the code
@Override
public void onComplete(a) {
    if(! isDisposed()) {try {
            // An observer is a downstream observer
            observer.onComplete();
        } finally{ dispose(); }}}Copy the code

This calls the methods in our sample code, which looks like this:

@Override
public void onNext(String s) {
    Log.i("TanJiaJun",s);
}
Copy the code
@Override
public void onComplete(a) {
    Log.i("TanJiaJun"."onComplete");
}
Copy the code

To summarize, the process is as follows:

  1. Subscribe is called from the upstream Observable and passed to the downstream Observer.
  2. Subscribe implements the subscribeActual method of ObservableCreate, a subclass of Observable, and passes it to the downstream Observer.
  3. In subscribeActual method, onSubscribe method of downstream Observer and subscribe method of ObservableOnSubscribe will be executed successively, thus completing the whole subscription process.
  4. If we emit events, such as the onNext and onComplete methods called ObservableEmitter in the example code, the onNext and onComplete methods of the downstream Observer will execute.

My GitHub: TanJiaJunBeyond

Common Android Framework: Common Android framework

My nuggets: Tan Jiajun

My simple book: Tan Jiajun

My CSDN: Tan Jiajun