This article is mainly to RxJava2 subscription process source analysis, first say I use RxJava and RxAndroid version, version as follows:

implementation 'the IO. Reactivex. Rxjava2: rxjava: 2.2.6'
implementation 'the IO. Reactivex. Rxjava2: rxandroid: 2.1.1'
Let’s write some sample code, which I’ll skip Lambda and chain calls for simplicity, as follows:

// Create the observed
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
    public void subscribe(ObservableEmitter<String> emitter) {

        emitter.onNext("Jun"); emitter.onComplete(); }});// Create an observer
Observer<String> observer = new Observer<String>() {
    public void onSubscribe(Disposable d) {

    public void onNext(String s) {
        Log.i("TanJiaJun"."onNext:" + s);

    public void onError(Throwable e) {

    public void onComplete(a) {
        Log.i("TanJiaJun"."onComplete"); }};/ / subscribe
Divided into three steps:

  1. Create an Observable.
  2. Create an Observer.
  3. Call the observed’s SUBSCRIBE method, pass in the observer, associate the two and subscribe.

Source code analysis

We start with the subscribe method, which looks like this:

public final void subscribe(Observer<? super T> observer) {
    // Check whether the observer is empty
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);

        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

        // Call the subscribeActual method of the subclass
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        // can't call onError because no way to know if a Disposable has been set or not
        // can't call onSubscribe because the call might have set a Subscription already

        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
See RxJavaPlugins onSubscribe method, the code is as follows:

@SuppressWarnings({ "rawtypes"."unchecked" })
public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
    BiFunction<? super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe;
    if(f ! =null) {
        return apply(f, source, observer);
    return observer;
This method will call the associated hook function, and we can see that it will determine whether the onObservableSubscribe is empty. This variable is assigned by the setOnObservableSubscribe method.

public static void setOnObservableSubscribe(
        @Nullable BiFunction<? super Observable, ? super Observer, ? extends Observer> onObservableSubscribe) {
    if (lockdown) {
        throw new IllegalStateException("Plugins can't be changed anymore");
    RxJavaPlugins.onObservableSubscribe = onObservableSubscribe;
However, we did not call this method, so this is empty and returns observer directly.

Moving on, subscribeActual is an important method that all subclasses of Observable implement as an interface, as we’ll see when we create the observed.

We call the observable. create method as follows:

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
The rxJavaplugins.onAssembly method is also a hook function, as follows:

@SuppressWarnings({ "rawtypes"."unchecked" })
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
    Function<? super Observable, ? extends Observable> f = onObservableAssembly;
    if(f ! =null) {
        return apply(f, source);
    return source;
It determines whether the onObservableAssembly variable, which is assigned by the setOnObservableAssembly method, is empty:

public static void setOnObservableAssembly(@Nullable Function<? super Observable, ? extends Observable> onObservableAssembly) {
    if (lockdown) {
        throw new IllegalStateException("Plugins can't be changed anymore");
    RxJavaPlugins.onObservableAssembly = onObservableAssembly;
However, we didn’t call this method, so let’s look directly at the ObservableCreate object we created. I’ve commented out the ObservableCreate object, and the code looks like this:

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    // Source is the upstream Observable in our sample code.
    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;

    protected void subscribeActual(Observer<? super T> observer) {
        // Create CreateEmitter, pass it to the downstream Observer.
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        // Call the onSubscribe method of the downstream Observer and pass in the CreateEmitter object

        try {
            // Call subscribe to the upstream Observable and pass CreateEmitter to it
            // Execute the onSubscribe method of the downstream Observer and subscribe method of the upstream Observable
        } catch (Throwable ex) {
            // Launch an error eventparent.onError(ex); }}// This class inherits AtomicReference and can implement atomic operations
    static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {

        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;

        // Pass in the downstream Observer
        CreateEmitter(Observer<? super T> observer) {
   = observer;

        public void onNext(T t) {
            // In rxJava2.x, the onNext method cannot pass null, otherwise a null pointer exception is thrown
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
            When isDisposed is false, call downstream Observe's onNext method, and pass in the corresponding object
            if (!isDisposed()) {

        public void onError(Throwable t) {
            if (!tryOnError(t)) {

        public boolean tryOnError(Throwable t) {
            // In rxJava2.x, the onError method cannot string null, otherwise it throws a null pointer exception
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            When the isDisposed method returns false, call the downstream observer onError method, pass in the Throwable object, and call the Dispose method
            if(! isDisposed()) {try {
                } finally {
                return true;
            return false;

        public void onComplete(a) {
            // When the isDispoesed method returns false, the onComplete method of the downstream Observer is called, followed by the Dispose method
            if(! isDisposed()) {try {
                } finally{ dispose(); }}}@Override
        public void setDisposable(Disposable d) {
            DisposableHelper.set(this, d);

        public void setCancellable(Cancellable c) {
            setDisposable(new CancellableDisposable(c));

        public ObservableEmitter<T> serialize(a) {
            return new SerializedEmitter<T>(this);

        public void dispose(a) {

        public boolean isDisposed(a) {
            return DisposableHelper.isDisposed(get());

        public String toString(a) {
            return String.format("%s{%s}", getClass().getSimpleName(), super.toString()); }}// Omit some code
ObservableOnSubscribe is an ObservableOnSubscribe method with an ObservableEmitter parameter.

public interface ObservableOnSubscribe<T> {

    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;

Our sample code implements this method as follows:

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
    public void subscribe(ObservableEmitter<String> emitter) {

        emitter.onNext("Jun"); emitter.onComplete(); }});Copy the code

ObservableEmitter onNext and ObservableEmitter onComplete are called. ObservableEmitter is CreateEmitter.

protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);

    try {
        / / the parent is CreateEmitter
    } catch(Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); }}Copy the code

Calling the onNext and onComplete methods is actually calling the onNext and onComplete methods of the downstream Observer as follows:

public void onNext(T t) {
    if (t == null) {
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
public void onComplete(a) {
    if(! isDisposed()) {try {
            // An observer is a downstream observer
        } finally{ dispose(); }}}Copy the code

This calls the methods in our sample code, which looks like this:

public void onNext(String s) {
public void onComplete(a) {
To summarize, the process is as follows:

  1. Subscribe is called from the upstream Observable and passed to the downstream Observer.
  2. Subscribe implements the subscribeActual method of ObservableCreate, a subclass of Observable, and passes it to the downstream Observer.
  3. In subscribeActual method, onSubscribe method of downstream Observer and subscribe method of ObservableOnSubscribe will be executed successively, thus completing the whole subscription process.
  4. If we emit events, such as the onNext and onComplete methods called ObservableEmitter in the example code, the onNext and onComplete methods of the downstream Observer will execute.

