Introduction to Rxjava

Rxjava realizes responsive programming through chain structure and provides a series of function libraries, which are especially suitable for asynchronous program thread switching and event-based response programming. By providing a series of operators, it is convenient to distribute, transform and respond to events. Convenient for thread switching and scheduling; The basic approach is to use the observer design pattern; Significantly simplified code and reduced hierarchy nesting;

1. Partial operator or function interface

1);

public static <T> Observable<T> just(T item)

Create operator just(T item)… just(T item1,… ,T item10)

When the parameter is 1 to 10, an ObservableJust object is created, the Run method of ScalarDisposable is called in its SubscribeActual method, and then the onNext(T) method of Observer is called in the Run method.

When the number of arguments is greater than 1, an ObservableFromArray object is created, and then the FromArrayDisposable run method is called in its subscribeActual method, where the Item is iterated through. The Observer onNext(T) method is then executed;

2) Map operator:

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper)

The map cannot be called directly by an Observable, but must be called by an Observable.

New ObservableMap<T, R>(this, mapper), ObservableMap<T, R>(this, mapper), ObservableMap<T, R>(this, mapper), ObservableMap<T, R>(this, mapper), ObservableMap<T, R>(this, mapper), ObservableMap<T, R>(this, mapper)

Observable.just("aaa")
    .map(new Function<String, Object>() {
        @Override
        public Object apply(@NonNull String s) throws Exception {
            return null;
        }
    })
    .subscribe(new Observer<Object>() {
        @Override
        public void onSubscribe(@NonNull Disposable d) {
            
        }

        @Override
        public void onNext(@NonNull Object o) {

        }

        @Override
        public void onError(@NonNull Throwable e) {

        }

        @Override
        public void onComplete() {

        }
    });
Copy the code

We call the just operator first and then the map operator, so this represents the ObservableJust object; Then, when we subscribe to an Observer by calling the Subscribe method, we call the subscribeActual method that actually subscribes, that is, the subscribeActual method that calls ObservableMap.

2. Call source.subscribe(new MapObserver<T, U>(T, function)) in the subscribeActual method of ObservableMap; The source is this, ObservableJust, so it calls the subscribe method of ObservableJust, and calls subscribeActual, which creates ScalarDisposable, And then the run method that we call, the Observer that we pass in when we create ScalarDisposable is MapObserver, and then in the run method we call the Observer onNext method, which is the onNext method of MapObserver, U v = mapper.apply(t) converts the t passed down in the previous step to v, and finally calls actual.onnext (v); This actual is our final implementation of the anonymous inner class Observer;

3) ObservableTransformer interface

There is only one abstract method: ObservableSource apply(@nonnull Observable upstream); This method performs a series of operations on an incoming Observable and then sends it out.

4) compose method

Take an ObservableTransformer object:

public final

Observable

compose(ObservableTransformer
composer); Return composer. Apply (this); Notice that this is an Observable that’s passed down;

The compose method and the ObservableTransformer interface encapsulate a number of common operations, such as thread switching, such as defining a thread switching method:

public static <UD> ObservableTransformer<UD, UD> changeThread(){ return new ObservableTransformer<UD, UD>() { @Override public ObservableSource<UD> apply(@NonNull Observable<UD> upstream) { return upstream.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()); }}; }Copy the code

You can then call this thread-switch method in the compose method to switch the threads.

5) Subscribe method

This method implements the subscription relationship between Observable and Observer. It can pass in an Observer Observer object. The reason why the Observable subscribes Observer Observer is that the chain call structure can be maintained.

Rxjava works with RetroFIT to access the network

The method is to change the return value to Observable when retroFit’s API defines the method, so that the object returned when we request the network through the API can be used as the Rxjava event source, and then thread switch and implement our own Observer processing results.

The subscribeOn(Scheduler) method is used to switch the threads before calling this method to the Scheduler thread. The observerOn(Scheduler) method is used to switch the thread after this method is called to the Scheduler thread;

Scheduler classification: Schedulers.io() is used for IO intensive tasks; Schedulers.computation() is typically used for CPU-intensive tasks; AndroidSchedulers. MainThread () refers to the android UI thread; If you want to perform UI operations, you need to switch to this thread and this thread cannot perform time-consuming operations.

3. Key anti-shake + network request operation

Subscribe (new Observer(){}) rxview.click (button).throttlefirst (2000, timeunit.milliseconds)

Subscribe is called once in anti-shake, and then if two network access is needed when button is pressed, and the second network request needs to use the result of the first request, then two more layers of SUBSCRIBE need to be nested. The nesting level of operation is too deep, and the code is not readable. You can eliminate this nesting using the flatMap operator;

Public Final Observable flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)

The difference between flatMap and Map is that the second generics in the Function interface become ObservableSource<? Extends R> reduces the level of nesting by calling observable. fromIterable(projectBean.getData()) multiple times in the Apply function to send the event.

The doOnNext operator

If we have a requirement that we register and then update the UI, and then log in and update the UI; We could of course write registration and login in two parts, but using the doOnNext operator we can combine the two operations into one;

public final Observable doOnNext(Consumer<? super T> onNext)

DoOnNext accepts a Consumer(a simplified Observer) and returns an Observable that merges our registration and login operations in a more elegant way.

5, Rxjava resource release

We usually release the resource in the onDestory method of the activity, assign the value of Disposable to the onSubscribe method in the Observer, and then do the following:

if (disposable ! = null && ! disposable.isDisposed()) disposable.dispose();

Rxjava mode and principle

1. Observer design pattern

Observer design pattern generally consists of four parts, the observed abstraction layer, implementation layer observed, observer, abstraction, observer, the concrete implementation layer, general maintenance in the concrete implementation layer observed a collection object used to store all of the observer, when the observed changes, inform every observer; For example, wechat public account can be understood as an observed, and all users subscribing to this public account are observers. When there is a new article push, all users will be notified.

Rxjava global Hook

When we call the various operators, we find that after new out of the Observablexxx object, we wrap it with rxJavaplugins.onAssembly, In this method we find that if the onObservableAssembly object in RxJavaPlugins is not empty, an Apply conversion is performed. OnObservableAssembly is a variable of type Function and defaults to null. . We can call RxJavaPlugins onObservableAssembly setOnObservableAssembly approach to custom value, which can realize the global monitoring, so that every use rxjava operator places will be called once, We can also do special processing for a particular Observablexxx object; OnObservableAssembly is static and globally unique;

static volatile Function<? super Observable, ? extends Observable> onObservableAssembly;

RxJava observer mode

Rxjava implements the Observer mode mainly through the three key parts of Observable, Observer and Subscribe process. In the process of creating an Observable, multiple operators can be used to make chain calls. The observed receives the message; Therefore, the Observer mode of RXJava is more like the published-subscribe mode, which works only when the Observer subscribes. It can be understood that the relationship between the observed and the Observer is many-to-one, because the creation process of the Observable is multiple chain calls, while the Observer Observer has only one. That is, our custom Observer or Consumer; The traditional observer design pattern is one-to-many relationship.

4, create create operator

public static Observable create(ObservableOnSubscribe source)

When we call the CREATE operator, we need to pass in an ObservableOnSubscribe object, which we can call the custom Source, which is a single method interface with only one SUBSCRIBE method; The Create call returns the ObservableCreate object, passing in the custom Source in the constructor, and then creating a CreateEmitter in the subscribeActual method that actually performs the subscription. This emitter will hold our custom Observer; We then call the observer.onSubscribe method first, and then call the subscribe method of our custom source on the emitter object. So we need to call emitters. OnNext and Emitters. OnComplete in our subscribe method for our custom source. The emitter’s Emitters’ emitters. OnNext and Emitters. OnComplete methods will call the corresponding methods of our custom Observer.

5. RxJava decorator model

Rxjava uses the decorator model to wrap each layer as we chain-call various operators; For example, if we call the CREATE operator and then the Map operator twice, we will first create an ObservableCreate object, and then create an ObservableMap object with ObservableCreate as the source. Finally, the second Observablemap is created using the first Observablemap as the source, so that once we call the SUBSCRIBE method, it will be called to the subscribeActual method of the map. In this method, source. Subscribe is called, so layer by layer, the subscribeActual method at all levels will be used from top to bottom.

RxJava thread switching and custom operators

1, subscribeOn Schedulers. IO () ()

Used to switch threads to IO threads for execution:

IO () : This method is called through a series of global hook and callable conversions, and finally an IoScheduler is obtained;

2) subscribeOn(IoScheduler): calling this method is the real thread switchover and passing in IoScheduler as a parameter; Return an ObservableSubscribeOn object that is called scheduler.scheduleDirect(Runnable) in the subscribeActual method of the ObservableSubscribeOn, which is called again in this method

Worker w = createWorker();
w.schedule(task, delay, unit);
Copy the code

The task is actually wrapped by the runnable we pass in, and the W is obtained by createWorker(), which is an abstract method in IoScheduler. An EventLoopWorker is returned;

public Worker createWorker() {
    return new EventLoopWorker(pool.get());
}
Copy the code

Then call the Schedule method of the EventLoopWorker;

public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {           
    return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
Copy the code

Will call again to threadWorker. ScheduleActual method, is this threadWorker NewThreadWorker type;

public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
    Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);      
    if (delayTime <= 0) {
        f = executor.submit((Callable<Object>)sr);
    } else {
        f = executor.schedule((Callable<Object>)sr, delayTime, unit);
    }
    sr.setFuture(f);           
    return sr;
}
Copy the code

You can see that either executor. Submit or executor. Schedule is called in this method. The executor creation process looks like this:

public static ScheduledExecutorService create(ThreadFactory factory) {
    final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);            
    return exec;
}
Copy the code

Can see it is to use the Executors. NewScheduledThreadPool (1, factory) to create a core thread 1 thread pool; That is, we give our wrapped runnable to the thread pool to execute, thus completing the thread switch; So where is the Runnable we just started wrapping? In fact, in the subscribeActual method of ObservableSubscribeOn, that is, the new SubscribeTask(parent) of this department, the run method in the Runnable is shown as follows:

public void run() {
    source.subscribe(parent);
}
Copy the code

The parent is the parent of the parent, and the source is the parent of the parent. Therefore, if only this method is called to switch the thread, the thread will be executed from top to bottom in the thread pool. So before executing our custom Observer we need to use observeOn to cut the thread back to the UI thread, otherwise the program crashes;

2. ObserveOn (AndroidSchedulers mainThread ())

Call this method to switch the thread to the Android UI thread:

. 1), AndroidSchedulers mainThread () : this method about IO call process, and after several times of global hooks and callable conversion, will eventually get HandlerScheduler:

static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
Copy the code

2), observeOn:

Calling this method is the real thread switch, which returns the ObservableObserveOn object and the subscribeActual method for ObservableObserveOn:

protected void subscribeActual(Observer<? super T> observer) {       
    Scheduler.Worker w = scheduler.createWorker();
    source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));        
}
Copy the code

The first call to scheduler.createWorker() is an abstract method that calls the HandlerScheduler, The createWorker() method of the HandlerScheduler takes handler as a new HandlerWorker object, This handler is the new handler (looper.getMainLooper ()) from step 1, which is the main thread handler;

Then call the second part of subscribeActual, which calls the onNext method of ObserveOnObserver. In the onNext method, it simply calls the schedule() method. Call worker.schedule(this) in the schedule method; Pass this as a runnable, and the worker is the HandlerWorker; So this method calls the HandlerWorker’s Schedule method:

@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
    run = RxJavaPlugins.onSchedule(run);
    ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

    Message message = Message.obtain(handler, scheduled);
    message.obj = this; // Used as token for batch disposal of this worker's runnables.

    handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
    return scheduled;
}
Copy the code

Can see in the schedule method, we discuss the two layers of incoming runnable encapsulation, and then through the handler. SendMessageDelayed message into the main thread, the message of the callback member variables is encapsulation runnable; So the handler will execute the original runnable’s run method directly; The original Runnable, which is the ObserveOnObserver encapsulated by a custom Observer, calls drainNormal() in its run method:

void drainNormal() { final Observer<? super T> a = actual; for (;;) { for (;;) { a.onNext(v); }}}Copy the code

Remove all code irrelevant to the main flow and find that this method calls actual. OnNext. This actual is assigned by the constructor, which is the wrapper of our custom observer or our custom observer.

3, to achieve a custom operator: imitation button shake

public ObservableViewClick(View view) {
    this.view = view;
}

@Override
protected void subscribeActual(Observer<? super Object> observer) {

    MyListener myListener = new MyListener(view, observer);
    observer.onSubscribe(myListener);

    this.view.setOnClickListener(myListener);
}
Copy the code

The core logic is as shown above. Define an ObservableViewClick that implements an Observable, then create an encapsulated Lisener in its subscribeActual method and pass in the View and our custom observer. The listener implements the click interface, so that the subscribe method is called at that point, which calls back to the onclick method implemented in the listener, which calls observer.onNext;

static final class MyListener implements View.OnClickListener, Disposable { private final View view; private final Observer<Object> observer; Private Final AtomicBoolean isDisposable = new AtomicBoolean(); public MyListener(View view, Observer<Object> observer) { this.view = view; this.observer = observer; } @Override public void onClick(View v) { if (! isDisposed()) { observer.onNext(EVENT); Override public void dispose() {Override public void dispose() {dispose(); if (isDisposable.compareAndSet(false, If (looper.myLooper () == looper.getMainLooper ()) {view.setonClickListener (null); } else {// main thread, By changing the Handler AndroidSchedulers. MainThread () scheduleDirect (new Runnable () {@ Override public void the run () { view.setOnClickListener(null); }}); } } } @Override public boolean isDisposed() { return isDisposable.get(); }}Copy the code

Essays record

1, == = equals

== is the address of the two objects being compared, and equals is the same thing as equals without overwriting it, so comparing two objects requires overwriting equals

1) compare two int objects: == return true; No equals method;Copy the code
If the value is greater than -128 and less than 127, return true. If the value is greater than -128 and less than 127, return true. If the value is greater than -128 and less than 127, return true. Equals equals equals equals equals equals equals equals equals equals equals equals equals equals equals equals Equals returns true;Copy the code
== return false; == return false; Equals returns true; If direct assignment is used: == returns true; Equals returns true;Copy the code

The child thread switches the UI thread

runOnUiThread

The handler is also called internally. In the activity, you simply new the handler and pass runnable as a parameter to the Handler’s POST method

handler

The main thread creates a handler. Do not use an anonymous inner class to create a handler. You should use a static inner class and then hold a weak reference to the activity in the static inner class for easy access to objects in the activity. The second approach is to create a handler using a constructor that takes a callback argument, and then process message in the callback