Simple use of Rxjava

Observable. Create (Object: ObservableOnSubscribe<Int> {Override Fun subscribe(Emitter: emitter) ObservableEmitter<Int>) { emitter.onNext(1) emitter.onNext(2) emitter.onNext(3) emitter.onComplete() } }). SubscribeOn (Schedulers. IO ()) / / run in IO thread. ObserveOn (AndroidSchedulers. MainThread ()) / / switching observer callback to the main thread. The subscribe (object: Observer<Int> {override fun onSubscribe(d: Disposable) {} Override fun onNext(t: Int) { } override fun onError(e: Throwable) { } override fun onComplete() { } })Copy the code

Let’s start by analyzing Rxjava as a whole.

Source code analysis

Let’s start by looking at the Observable. Create () subscribeOn() observeOn() subscribe() method.

@CheckReturnValue @NonNull @SchedulerSupport(SchedulerSupport.NONE) public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); OnAssembly (new ObservableCreate<T>(source)); // Use a simple wrapper hook function for source. } //subscribeOn() @CheckReturnValue @SchedulerSupport(SchedulerSupport.CUSTOM) public final Observable<T> subscribeOn(Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); // The ObservableSubscribeOn ObservableCreate returns rxJavaplugins.onAssembly (new) ObservableSubscribeOn<T>(this, scheduler)); } //observeOn() @CheckReturnValue @SchedulerSupport(SchedulerSupport.CUSTOM) public final Observable<T> observeOn(Scheduler scheduler) { return observeOn(scheduler, false, bufferSize()); } @CheckReturnValue @SchedulerSupport(SchedulerSupport.CUSTOM) public final Observable<T> observeOn(Scheduler scheduler,  boolean delayError, int bufferSize) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); // ObservableObserveOn, OnAssembly (New ObservableObserveOn<T>(this, ObservableObserveOn, Scheduler, delayError, bufferSize)); } //subscribe() @SchedulerSupport(SchedulerSupport.NONE) @Override public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); Try {/ / this is here ObservableObserveOn observer. = RxJavaPlugins onSubscribe (this, the observer). ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins"); // Subscribe to the Observer subscribeActual(Observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; }}Copy the code

Having looked through the flow of the invocation, we now have three key classes in view. ObservableCreate ObservableSubscribeOn ObservableObserveOn, and you can see that they’re being called layer after layer after layer, all before that in the configuration argument, the last subscribe() method that’s actually doing something.

We start from the subscribeActual(Observer); Look up the

/** * Operator implementations (both source and intermediate) should implement this method that * performs the necessary  business logic and handles the incoming {@link Observer}s. * <p>There is no need to call any of the plugin hooks on the  current {@code Observable} instance or * the {@code Observer}; all hooks and basic safeguards have been * applied by {@link #subscribe(Observer)} before this method gets called. * @param observer the incoming Observer, never null */ protected abstract void subscribeActual(Observer<? super T> observer);Copy the code

Is an abstract method (you can think of starting layer by layer back here).

Let’s start with subscribeActual in ObservableObserveOn

@Override protected void subscribeActual(Observer<? ObservableSubscribeOn if (ObservableSubscribeOn instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); }}Copy the code

Look at the source. The subscribe (observer); What did

Public interface ObservableSource<T> {/** * Subscribes the given Observer to this ObservableSource instance. * @param observer the Observer, not null * @throws NullPointerException if {@code observer} is null */ void subscribe(@NonNull Observer<? super T> observer); } @schedulerSupport (schedulersupport.none) @override public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins"); subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; }}Copy the code

HHH, subscribeActual(Observer); Function.

ObservableSubscribeOn

@Override public void subscribeActual(final Observer<? Final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer); // Call the onSubscribe method observer.onSubscribe(parent); // SubscribeTask parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); } // is a runnable? Oh? Final class SubscribeTask implements Runnable {private final SubscribeOnObserver<T> parent; SubscribeTask(SubscribeOnObserver<T> parent) { this.parent = parent; Override public void run() {subscribe(parent) {Override public void run() {subscribe(parent); }}Copy the code

As you can see, the Runnable executes the subscribeActual of the last Observable, and you can imagine that the callbacks for the last three results are all there.

@Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); OnSubscribe (d: Disposable) calls observer.onsubscribe (parent); Source.subscribe (parent); source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); }}Copy the code

You can look inside the CreateEmitter

static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable { private static final long serialVersionUID = -3434801548987643227L; final Observer<? super T> observer; CreateEmitter(Observer<? super T> observer) { this.observer = observer; } @Override public void onNext(T t) { if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } if (! IsDisposed ()) {// Here observer.onNext(t); } } @Override public void onError(Throwable t) { if (! tryOnError(t)) { RxJavaPlugins.onError(t); } } @Override public boolean tryOnError(Throwable t) { if (t == null) { t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."); } if (! IsDisposed ()) {try {// Here observer.onerror (t); } finally { dispose(); } return true; } return false; } @Override public void onComplete() { if (! IsDisposed ()) {try {// Here observer.oncomplete (); } finally { dispose(); }}}Copy the code

Callback and so on.

Personal summary

Rxjava in general is a very good framework, although the learning cost is high, but the internal design patterns and ideas are worth learning. Read the source code, learn more. I hope you can grow into the person you want to be.

Since it is my first time to write an article, please correct and include any bad points.