Simple use of Rxjava
Observable. Create (Object: ObservableOnSubscribe<Int> {Override Fun subscribe(Emitter: emitter) ObservableEmitter<Int>) { emitter.onNext(1) emitter.onNext(2) emitter.onNext(3) emitter.onComplete() } }). SubscribeOn (Schedulers. IO ()) / / run in IO thread. ObserveOn (AndroidSchedulers. MainThread ()) / / switching observer callback to the main thread. The subscribe (object: Observer<Int> {override fun onSubscribe(d: Disposable) {} Override fun onNext(t: Int) { } override fun onError(e: Throwable) { } override fun onComplete() { } })Copy the code
Let’s start by analyzing Rxjava as a whole.
Source code analysis
Let’s start by looking at the Observable. Create () subscribeOn() observeOn() subscribe() method.
@CheckReturnValue @NonNull @SchedulerSupport(SchedulerSupport.NONE) public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); OnAssembly (new ObservableCreate<T>(source)); // Use a simple wrapper hook function for source. } //subscribeOn() @CheckReturnValue @SchedulerSupport(SchedulerSupport.CUSTOM) public final Observable<T> subscribeOn(Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); // The ObservableSubscribeOn ObservableCreate returns rxJavaplugins.onAssembly (new) ObservableSubscribeOn<T>(this, scheduler)); } //observeOn() @CheckReturnValue @SchedulerSupport(SchedulerSupport.CUSTOM) public final Observable<T> observeOn(Scheduler scheduler) { return observeOn(scheduler, false, bufferSize()); } @CheckReturnValue @SchedulerSupport(SchedulerSupport.CUSTOM) public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); // ObservableObserveOn, OnAssembly (New ObservableObserveOn<T>(this, ObservableObserveOn, Scheduler, delayError, bufferSize)); } //subscribe() @SchedulerSupport(SchedulerSupport.NONE) @Override public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); Try {/ / this is here ObservableObserveOn observer. = RxJavaPlugins onSubscribe (this, the observer). ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins"); // Subscribe to the Observer subscribeActual(Observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; }}Copy the code
Having looked through the flow of the invocation, we now have three key classes in view. ObservableCreate ObservableSubscribeOn ObservableObserveOn, and you can see that they’re being called layer after layer after layer, all before that in the configuration argument, the last subscribe() method that’s actually doing something.
We start from the subscribeActual(Observer); Look up the
/** * Operator implementations (both source and intermediate) should implement this method that * performs the necessary business logic and handles the incoming {@link Observer}s. * <p>There is no need to call any of the plugin hooks on the current {@code Observable} instance or * the {@code Observer}; all hooks and basic safeguards have been * applied by {@link #subscribe(Observer)} before this method gets called. * @param observer the incoming Observer, never null */ protected abstract void subscribeActual(Observer<? super T> observer);Copy the code
Is an abstract method (you can think of starting layer by layer back here).
Let’s start with subscribeActual in ObservableObserveOn
@Override protected void subscribeActual(Observer<? ObservableSubscribeOn if (ObservableSubscribeOn instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); }}Copy the code
Look at the source. The subscribe (observer); What did
Public interface ObservableSource<T> {/** * Subscribes the given Observer to this ObservableSource instance. * @param observer the Observer, not null * @throws NullPointerException if {@code observer} is null */ void subscribe(@NonNull Observer<? super T> observer); } @schedulerSupport (schedulersupport.none) @override public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins"); subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; }}Copy the code
HHH, subscribeActual(Observer); Function.
ObservableSubscribeOn
@Override public void subscribeActual(final Observer<? Final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer); // Call the onSubscribe method observer.onSubscribe(parent); // SubscribeTask parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); } // is a runnable? Oh? Final class SubscribeTask implements Runnable {private final SubscribeOnObserver<T> parent; SubscribeTask(SubscribeOnObserver<T> parent) { this.parent = parent; Override public void run() {subscribe(parent) {Override public void run() {subscribe(parent); }}Copy the code
As you can see, the Runnable executes the subscribeActual of the last Observable, and you can imagine that the callbacks for the last three results are all there.
@Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); OnSubscribe (d: Disposable) calls observer.onsubscribe (parent); Source.subscribe (parent); source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); }}Copy the code
You can look inside the CreateEmitter
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable { private static final long serialVersionUID = -3434801548987643227L; final Observer<? super T> observer; CreateEmitter(Observer<? super T> observer) { this.observer = observer; } @Override public void onNext(T t) { if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } if (! IsDisposed ()) {// Here observer.onNext(t); } } @Override public void onError(Throwable t) { if (! tryOnError(t)) { RxJavaPlugins.onError(t); } } @Override public boolean tryOnError(Throwable t) { if (t == null) { t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."); } if (! IsDisposed ()) {try {// Here observer.onerror (t); } finally { dispose(); } return true; } return false; } @Override public void onComplete() { if (! IsDisposed ()) {try {// Here observer.oncomplete (); } finally { dispose(); }}}Copy the code
Callback and so on.
Personal summary
Rxjava in general is a very good framework, although the learning cost is high, but the internal design patterns and ideas are worth learning. Read the source code, learn more. I hope you can grow into the person you want to be.
Since it is my first time to write an article, please correct and include any bad points.