preface

What is RxJava

RxJava — Reactive Extensions for the JVM — a library for composing asynchronous and event-based programs using observable sequences for the Java VM.

RxJava is a responsive extended implementation for the JVM(Java Virtual Machine), a library for asynchronous, event-based programming using observable sequence combinations on the Java VM.

RxJava now everyone should be very slippery, usage here no longer say more. We all know that RxJava is an extension of the observer pattern, so let’s take a look at the implementation logic of RxJava2 starting from the implementation mechanism of the Observer pattern. Only when we truly understand the implementation principle of RxJava, we can locate the problem more quickly and accurately when we encounter problems.

This source code analysis is based on RxJava Release 2.1.7

Observer mode

Here is a brief review of the composition and usage of the observer pattern. Through the analysis in the previous observer pattern article, we know that there are four important roles in the observer pattern:

  • Abstract topic: Defines the ability to add and remove observers, i.e., register and de-register
  • Abstract Observer: Define what an observer does when it receives a topic notification
  • Concrete topic: An implementation of an abstract topic
  • Concrete observer: An implementation of the abstract observer

Once we have created the specific topic and the observer class, we are ready to use the observer pattern. Here is the simplest demo to test.

public class TestObservePattern {

    public static void main(String[] args) {
		// Create topic (observed)
        ConcreteSubject concreteSubject = new ConcreteSubject();
		// Create an observer
        ObserverOne observerOne=new ObserverOne();
        // Add observers to the theme
        concreteSubject.addObserver(observerOne);        
        // The topic notifies all observers
        concreteSubject.notifyAllObserver("wake up,wake up"); }}Copy the code

So that’s how the observer mode works. Pretty simple, right? Now let’s take a look at how RxJava uses the observer pattern with the following questions.

After working with RxJava for so long, you can consider the following questions:

  1. How are the four important roles mentioned above defined in RxJava?
  2. How are specific topics, specific observers, instantiated in RxJava?
  3. How do observers and topics implement subscriptions in RxJava?
  4. How are events sent upstream in RxJava and received downstream?
  5. What are the benefits of tweaking the regular observer pattern in RxJava?

If you have a clear answer to any of the above questions, congratulations, you don’t need to read the following.

Many developers are likely to learn RxJava from both upstream and downstream perspectives, and it can be assumed that this narrative is more focused on the characteristics of RxJava event sequences. This article, from the point of view of the observed (subject) and the observer, is arguably more characteristic of the RxJava Observer pattern. The topic here is upstream, the observer is downstream. No matter from which point of view to understand, the source code is so a, there is no right or wrong, just everyone’s cognitive perspective is different, choose a way to understand more easily.

Well, if you see here, you have some questions about the above questions, so let’s start from these questions, to understand the source code implementation of RxJava.

Observer pattern implementation for RxJava2

With these questions in mind, let’s take a look at RxJava in turn. For the sake of narrative and memorization, let’s first look at a very basic use of RxJava2.

private void basicRxjava2(a) {
        Observable mObservable = Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(ObservableEmitter e) throws Exception {
                e.onNext("1");
                e.onNext("2");
                e.onNext("3");
                e.onNext("4"); e.onComplete(); }}); Observer mObserver =new Observer() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e(TAG, "onSubscribe: d=" + d);
                sb.append("\nonSubcribe: d=" + d);
            }

            @Override
            public void onNext(Object s) {
                Log.e(TAG, "onNext: " + s);
                sb.append("\nonNext: " + s);
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError: " + e);
                sb.append("\nonError: " + e.toString());
                logContent.setText(sb.toString());
            }

            @Override
            public void onComplete(a) {
                Log.e(TAG, "onComplete");
                sb.append("\nonComplete: "); logContent.setText(sb.toString()); }}; mObservable.subscribe(mObserver); }Copy the code

The code above, I think, should be pretty easy to understand, and you can think of the output with your eyes closed. Let’s take this code as the basis for the analysis of RxJava, combined with the problems mentioned above.

Four important roles

First look at how the four important roles in RxJava are defined.

  • Abstract subject

First look at the Observable class.

public abstract class Observable<T> implements ObservableSource<T> {
……
}
Copy the code

He implements the ObservableSource interface. Next, look at ObservableSource

public interface ObservableSource<T> {

    /**
     * Subscribes the given Observer to this ObservableSource instance.
     * @param observer the Observer, not null
     * @throws NullPointerException if {@code observer} is null
     */
    void subscribe(@NonNull Observer<? super T> observer);
}
Copy the code

It is clear here that ObservableSource is the role of the abstract subject (the observed). The subscribe method is used to implement the function of subscribing to the Observer role, as agreed in the previous Observer pattern. From this we can also see that the role of an abstract Observer is an Observer.

Here, you might wonder, is it that simple? Doesn’t the abstract topic (upstream) need to send events? Where are onNext(),onComplete(), and onError()? Don’t worry. Let’s take our time in the back.

  • The specific topic

Going back to Observable, it implements the ObservableSource interface and implements the subscribe method, but it doesn’t really subscribe between the topic and the observer. It is forwarded to another abstract method, subscribeActual (more on that later).

Thus, Observable is still an abstract class. We know that an abstract class cannot be instantiated, so in theory, it doesn’t seem to have a role as a concrete subject. Actually otherwise, provides the create observables interior, defer, fromXXX, repeat, just a series of creational operator is used to create various observables.

    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
Copy the code

There are many subclasses of RxJava.

Indeed, you could argue that these subclasses are really specific topics. However, from the perspective of the proxy model, we can think of an Observable as a proxy class. You can call the create method and let me know what Observable you want. You don’t have to worry about the differences between the observables. Make sure to return the Observable instance you want.

Another great contribution of Observable is that it defines many operators, such as map,flatMap, and Distinct. And these methods are final, so all subclasses inherit and cannot change the implementation of the operators.

Thus, An Observable is a specific subject.

  • Abstract observer

As mentioned in the abstract topic, an Observer is the role of an abstract Observer.

public interface Observer<T> {

    void onSubscribe(@NonNull Disposable d);

    void onNext(@NonNull T t);

    void onError(@NonNull Throwable e);

    void onComplete(a);

}

Copy the code

Very much in keeping with the abstract Observer’s job description in the Observer pattern, an Observer defines what an Observer (downstream) does when it is notified by a topic (upstream). The important thing to notice here is that onSubscribe is also defined here.

  • Specific observer

Now, this particular observer, O, I won’t say much more about that. If you want to use an example of a new Observer, you should use it. RxJava has a number of internal subclasses of Observer, which interested students can learn more about. This raises an interesting question: why can an interface be instantiated directly for an abstract class, but not a class decorated with abstract?

How is the specific observer instantiated

Let’s look at this code:

    Observable mObservable = Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(ObservableEmitter e) throws Exception {}});public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
	
	
    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
		// If there is any other operator, perform it on the Observable if there is
        if(f ! =null) {
            return apply(f, source);
        }
        return source;
    }

Copy the code

RxJava code, a lot of time there will be a ObjectHelper. This empty check requireNonNull place, all is in order to prevent the occurrence of NPE to a great extent, the back is no longer here.

The process of creating an Observable using the create operator seems to have gone through a lot of steps, but without considering any other operators, the whole process is simplified in one code

  Observable mObservable=new ObservableCreate(new ObservableOnSubscribe())
Copy the code

From the previous analysis, we also see that ObservableCreate is a subclass of the Observeable abstract class. Let’s take a quick look at his implementation.

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {... }}Copy the code

As you can see, his only constructor requires an ObservableOnSubscribe instance, and he implements the subscribeActual method, showing that he really handles the logic to implement subscriptions between the topic and the observer.

You may have been wondering, what is this ObservableOnSubscribe? He’s very simple.

/**
 * A functional interface that has a {@code subscribe()} method that receives
 * an instance of an {@link ObservableEmitter} instance that allows pushing
 * events in a cancellation-safe manner.
 *
 * @param <T> the value type pushed
 */
public interface ObservableOnSubscribe<T> {

    /**
     * Called for each Observer that subscribes.
     * @param e the safe emitter instance, never null
     * @throws Exception on error
     */
    void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
}
Copy the code

ε=(´ subscribe ‘*))) Oh, another subscribe, what is that? Don’t panic. Read the notes. This means that the subscribe that receives an ObservableEmitter instance will be allowed to send events in a form that it is safe to cancel.

So you have an object, you give it an instance of ObservableEmitte, and until you give it an instance of ObservableEmitte, it’s not going to send an event, it’s going to hold it. We know in RxJava that a topic will only send an event if an observer subscribing to it. And that’s one of the differences from the normal observer pattern.

All right, finally, what is this mysterious ObservableEmitter?

public interface ObservableEmitter<T> extends Emitter<T> {

    void setDisposable(@Nullable Disposable d);


    void setCancellable(@Nullable Cancellable c);


    boolean isDisposed(a);

    ObservableEmitter<T> serialize(a);

      /**
     * Attempts to emit the specified {@code Throwable} error if the downstream
     * hasn't cancelled the sequence or is otherwise terminated, returning false
     * if the emission is not allowed to happen due to lifecycle restrictions.
     * <p>
     * Unlike {@link #onError(Throwable)}, the {@code RxJavaPlugins.onError} is not called
     * if the error could not be delivered.
     * @param t the throwable error to signal if possible
     * @return true if successful, false if the downstream is not able to accept further
     * events
     * @since2.1.1 - experimental * /
    boolean tryOnError(@NonNull Throwable t);
}
Copy the code

Look at the tryOnError method here, and you can see that it passes some type of error downstream.

O (╥ : ╥) O, is another interface, but also inherited another interface, what is the situation? Continue to look at

public interface Emitter<T> {

    void onNext(@NonNull T value);

  
    void onError(@NonNull Throwable error);

   
    void onComplete(a);
}
Copy the code

Is it surprising? Is it surprising? Haha, finally found you, familiar onNext, onError,onComplete. Here it is.

Here’s a problem to think about, in the abstract observer, there are four ways to deal with events, and there are only three, and there seems to be an onSubscribe missing in terms of correspondence. What’s going on? There will be an analysis later, so you can think about it for yourself

The meanings of these two interfaces are obvious, so let’s summarize:

  • Emitter defines three mechanisms by which events can be sent
  • ObservableEmitter expands on Emitter by adding a method related to Disposable, which can be used to cancel sending events.

Okay, that’s a long way to go, just for one line of code:

  Observable mObservable=new ObservableCreate(new ObservableOnSubscribe())
Copy the code

To summarize what’s going on in the specific topic (upstream) :

  • An instance object of ObservableCreate is created
  • The ObservableCreate holds a reference to the ObservableOnSubscribe object
  • ObservableOnSubscribe is an interface with an internal SUBSCRIBE method that, when called, starts sending events with its ObservableEmitter instance.
  • ObservableEmitter is inherited from Emitte.

How do I subscribe, send events, and receive events

For the sake of illustration, I put questions 3 and 4 together.

With that said, now that the specific topic and the specific observer are created, the next step is to implement the subscription relationship between the two.

mObservable.subscribe(mObserver);
Copy the code

To be clear, it is the observer (downstream) subscribing to the topic (upstream), although from the code it looks like the former subscribing to the latter, so don’t get confused.

Let’s look at the Subscribe () method of Observable:

    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch(Throwable e) {... }}Copy the code

As mentioned earlier, The Observable doesn’t actually implement a subscribe, but instead transfers it to the subscribeActual() method.

Having said that an instance of An Observable is an ObservableCreate object, let’s look at the subscribeActual() implementation in this class.

	// For convenience, take a look at the constructor
    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch(Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); }}Copy the code

CreateEmitter implements the ObservableEmitter interface mentioned earlier. Here’s the key code:

observer.onSubscribe(parent);
Copy the code

Earlier, when we looked at the definition of Emitter, we said that the onSubscribe method was missing, so we’ll see. An onSubscribe is not an event sent by the subject (upstream), but an event called by the observer (downstream), just to get an instance of Emitter (Disposable), so that the downstream can control the upstream.

The next step is to make it even easier. The source is ObservableOnSubscribe. In the same way as before, call its subscribe method, give it an instance of an ObservableEmitter, and the ObservableEmitter will start sending a sequence of events. This way, once the subscription is started, the topic (upstream) starts sending events. Also is our familiar onNext, onComplete, onError method the implementation of the real.

Next, look at the implementation of CreateEmitter.


public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = newCreateEmitter<T>(observer); observer.onSubscribe(parent); ... }static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {


        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if(! isDisposed()) {try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
                return true;
            }
            return false;
        }

        @Override
        public void onComplete(a) {
            if(! isDisposed()) {try {
                    observer.onComplete();
                } finally{ dispose(); }}}@Override
        public void setDisposable(Disposable d) {
            DisposableHelper.set(this, d);
        }

        @Override
        public void setCancellable(Cancellable c) {
            setDisposable(new CancellableDisposable(c));
        }

        @Override
        public ObservableEmitter<T> serialize(a) {
            return new SerializedEmitter<T>(this);
        }

        @Override
        public void dispose(a) {
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed(a) {
            returnDisposableHelper.isDisposed(get()); }}}Copy the code
  • Its constructor requires an instance of an observer;
  • He implements the ObservableEmitter interface and implements his three methods in turn;
    • On each onNext event, it no longer accepts null arguments and passes the event T sent by the subject (upstream) to the observer (downstream) without breaking the event sequence.
    • It also notifies downstream when an onComplete event occurs and interrupts the event sequence if an exception occurs
    • When an onError event occurs, it is not passed directly downstream, but rather handled internally
    • Certain types of errors are passed downstream only when the tryOnError event occurs.
  • He implements the Disposable interface, which enables downstream to obtain information about the event sequence, and even to actively close the event sequence, and disconnect the subscription relationship between the topic and the observer in the observer mode.

What are the benefits of tweaking the regular observer pattern in RxJava?

Finally, let’s briefly talk about how the regular observer pattern has been adjusted in RxJava and what we can learn from it. Most of the advantages have been mentioned above, so let’s summarize them here.

  • A topic does not start sending events until an observer subscribing to it
  • In RxJava, the Observer obtains the Disposable object from the sending event by onSubscribe, so that it can proactively obtain the state of the two subscriptions, and even control or interrupt the sending of the event sequence. In the regular observer mode, a topic has the right to add subscribers, but it can also remove specific subscribers because it alone holds the set of all subscribers
  • The abstract topic (upstream) does not directly control the sending of onNext, onComplete, and onError events. Instead, it focuses on the sending of An Emitter instance. The ObservableOnSubscribe interface listens for the sending of an ObservableEmitter object. Once the object is received, it starts sending specific events through it, so there’s a little bit of an observer pattern nesting here.

Ok, above is from the perspective of the observer mode, an interpretation of RxJava, there are any omissions or misunderstanding, welcome readers to point out, progress together!