This article was first published in: source code interpretation – RxJava2 source code interpretation

This article includes 1 basic flow of RxJava event flow; Write a basic RxJava process by yourself.

Basic flow of RxJava event flow

When FIRST introduced to RxJava, many articles defined the observer and Observable objects as observer and observed. In fact, it is easy to confuse people by treating an Observable as an upstream event producer and an observer as a downstream event handler.

The simplest way to call RxJava2 is as follows:

Observable. Create (ObservableOnSubscribe<String>() {public ObservableOnSubscribe (ObservableEmitter<String>) emitter) throws Exception { } }).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(String s) { } @Override public void onError(Throwable e) { } @Override public voidonComplete() {}});Copy the code

The basic logic is that Observable creates a
class, and the
class calls subscribe and passes in an observer.

Issues to be addressed:

Question: What type of instance does the create method create?

With the problem in mind, let’s look at the create method at ①. The main contents are as follows:

ObservableOnSubscribe<T> Public ObservableOnSubscribe<T> ObservableOnSubscribe<T>source) {
    ObjectHelper.requireNonNull(source."source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); RxJavaPlugins public static <T> Observable<T> onAssembly(@nonnull Observable<T>)source) {
    Function<? super Observable, ? extends Observable> f = onObservableAssembly;
    if(f ! = null) {return apply(f, source);
    }
    return source; (2)}Copy the code

To avoid distraction, let’s just look at the main branch, and make sense of the main branch, so that the whole context is basically clear.

ObservableCreate (ObservableCreate, ObservableCreate, ObservableCreate, ObservableCreate, ObservableCreate, ObservableCreate, ObservableCreate, ObservableCreate, ObservableCreate)

So here we can answer the above question: The object created by Create is ObservableCreate.

So logically, we must now find the subscribe method of the ObservableCreate.

Yeah, yeah, but will things go as well as we expect? We do not find the subscribe method in the ObservableCreate class.

Q: Where is the subscribe method?

ObservableCreate ObservableCreate

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source; } @Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); }}... }Copy the code

ObservableCreate inherits to Observable, but we didn’t find the subscribe method. So the first idea is to look in the superclass, which is an Observable.

Observable public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer,"observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);

        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already
        RxJavaPlugins.onError(e);

        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; }}Copy the code

Subscribe is a final method. Subscribe is a final method. Subscribe is a final method, subscribe is a final method, subscribe is a final method.

Observable class protected abstract void subscribeActual(Observer<? super T> observer);Copy the code

It is an abstract method, which we are relieved, somewhat similar to the onMeasure method in the Android control View.

As a result, we can answer the questions raised above.

A: Subscribe method of Observable is an abstract method. All subclasses inherit Observable and implement its abstract method subscribeActual to carry out the actual subscription operation.

So we go straight to the subscribeActual method in the ObservableCreate class:

ObservableCreate @override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); (1) the observer. OnSubscribe (parent); (2) the try {source. The subscribe (parent); ③} catch (Throwable ex) {Exceptions. ThrowIfFatal (ex); parent.onError(ex); }}Copy the code

Q: What is the rationale for sending messages?

(1) Create an emitter

Does the method at (2) look familiar? Yes, it is. It is the first method called by our Observer.

③ Source is the ObservableOnSubscribe parameter in the create method that we first called.

Remember our original example?

Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {

    }
})
......
Copy the code

The parameter passed in (3) is an instance of the ObservableEmitter object here, so every time we send a message using an Emitter, we trigger the CreateEmitter onNext method.

static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {

    private static final long serialVersionUID = -3434801548987643227L;

    final Observer<? super T> observer;

    CreateEmitter(Observer<? super T> observer) {
        this.observer = observer;
    }

    @Override
    public void onNext(T t) {
        if (t == null) {
            onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
        }
        if(! IsDisposed ()) {// Trigger the onNext method observer.onnext (t); }} omit part of the code..... }Copy the code

The ancients cloud: the paper come zhongjue shallow, and must know this to practice. The process of writing this by hand is more memorable and solid than any tutorial.

Write your own Observable

To write our Observable, I’ll prefix the names with Perry.

abstract class PerryObservable<T> {
    // Subscribe to the method to start calling
    fun subscribe(observer: PerryObserver<in T>) {
        subscribeActual(observer)
    }
    
    internal abstract fun subscribeActual(observer: PerryObserver<in T>)

    companion object {
        // create constructor
        fun <T> create(source: PerryObservableOnSubscribe<T>): PerryObservable<T> {
            return PerryObservableCreate(source)
        }
    }
}
Copy the code

Then PerryObservableCreate.

class PerryObservableCreate<T>(private val source: PerryObservableOnSubscribe<T>) : PerryObservable<T>() {

    override fun subscribeActual(observer: PerryObserver<in T>) {
        val emitter = PerryCreateEmitter(observer)
        observer.onSubscribe(emitter)

        source.subscribe(emitter)
    }
    
    // This is our launcher
    class PerryCreateEmitter<T> internal constructor(private val observer: PerryObserver<in T>) 
      : PerryDisposable, PerryEmitter<T> {

        override fun dispose(a){}override fun isDisposed(a): Boolean {
            return false
        }

        override fun onNext(value: T) {
            observer.onNext(value)
        }

        override fun onError(error: Throwable) {
            observer.onError(error)
        }

        override fun onComplete(a) {
            observer.onComplete()
        }
    }
}
Copy the code

All the other interfaces are not posted.

interface PerryEmitter<T> {

    fun onNext(@NonNull value: T)
    
    fun onError(@NonNull error: Throwable)
    
    fun onComplete(a)
}
Copy the code

The final call method is as follows:

PerryObservable.Companion.create(new PerryObservableOnSubscribe<String>() {
    @Override
    public void subscribe(PerryObservableCreate.PerryCreateEmitter<String> emitter) {
        emitter.onNext("hello");
        emitter.onNext("world");
    }
}).subscribe(new PerryObserver<String>() {
    @Override
    public void onSubscribe(PerryDisposable d) {

    }

    @Override
    public void onNext(String s) {
        Log.d("zp_test", s);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {}});Copy the code

Print logs:

At this point, we basically understand the logic of its occurrence message, due to the space is limited, other more advanced functions, look forward to next time see you!