RxJava is the official documentation of the branch of ReactiveX (a programming interface, it is a programming idea) in card form and chain transfer

The origin of

In Android development, there are a lot of callback processing, and multi-condition interaction, asynchronous call and other related processing, in order to increase the code cleanliness and readability so the project use RXJava

Rx mode

Using observer mode

  • Create: Rx makes it easy to create event streams and data streams
  • Composition: Rx combines and transforms data streams using query-like operators
  • Listen: Rx can subscribe to any observable data stream and perform operations

Simplify the code

  • Functional style: The use of side-effect-free input/output functions for observable data streams avoids the intricacies of state in the program
  • Simplified code: Rx often works to simplify complex problems into a few lines of code
  • Asynchronous error handling: Traditional try/catch cannot handle asynchronous computations, and Rx provides an appropriate error handling mechanism
  • Easy concurrency: Rx’s Observables and Schedulers allow developers to get rid of underlying thread synchronization and concurrency issues

Members of the

Observable: Observable

In ReactiveX, an Observer subscribes to an Observable. An observer responds to data, or a sequence of data, emitted by an Observable. This mode greatly simplifies concurrency because it creates an observer sentinel that is on standby to respond to an Observable’s notification at some point in the future, without blocking and waiting for the Observable to emit data

Create: Originator of the creation of soures resource events

Sample code:

// 创建一个可被观察的 Observable
Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> observer) {
        try {
            if(! observer.isUnsubscribed()) {for (int i = 1; i < 5; i++) { observer.onNext(i); } observer.onCompleted(); }}catch(Exception e) { observer.onError(e); }}//subscribe Subscribes to the Subscriber
 } ).subscribe(new Subscriber<Integer>() {
        @Override
        public void onNext(Integer item) {
            System.out.println("Next: " + item);
        }

        @Override
        public void onError(Throwable error) {
            System.err.println("Error: " + error.getMessage());
        }

        @Override
        public void onCompleted(a) {
     
            System.out.println("Sequence complete."); }});Copy the code

The Subscriber: observer

Observable: data emitted by an Observable that listens on data emitted by multiple sourses:

Callback methods (onNext, onCompleted, onError)

The Subscribe method is used to connect an observer to an Observable. Your observer needs to implement a subset of the following methods:

  • onNext(T item)

    The Observable calls this method to emit data, and its parameters are the data that the Observable emits. This method can be called multiple times, depending on your implementation.

  • onError(Exception ex)

    This method is called when an Observable encounters an error or fails to return the expected data. The call terminates the Observable, and onNext and onCompleted are not called. The arguments to the onError method are thrown exceptions.

  • onComplete

    Observable calls this method after the last onNext call if no errors are encountered.

According to the Observable protocol, onNext may be called zero or many times, and there is an onCompleted or onError call (not at the same time). Passing data to onNext is usually called an emission. OnCompleted and onError are called notifications.

Operator: Wraps data logic

Oncreate, map, and flatMap are all operators, and RxJava has a number of operators available:

use

Retrofit is used together, asynchronous and multi-logical operation

public void request2(View view) {

        /** ** one line of code to implement requirements * requirements: * and pop-up loading ** 1. Request server registration operation * * 2. After registration, update the registration UI * * 3. 4. After login, update the login UI */
        MyRetrofit.createRetrofit().create(IReqeustNetwor.class)
                .registerAction(new RegisterRequest()) // todo 1. Request server to register operation // todo 2
                .subscribeOn(Schedulers.io()) // give the above async
                .observeOn(AndroidSchedulers.mainThread()) // assign the main thread below
                .doOnNext(new Consumer<RegisterResponse>() { // todo 3
                    @Override
                    public void accept(RegisterResponse registerResponse) throws Exception {
                        // todo 2. Update the registration UI after the registration is complete}})// todo 3. Log in to the server immediately
                .observeOn(Schedulers.io()) // assign the following asynchronous threads
                .flatMap(new Function<RegisterResponse, ObservableSource<LoginResponse>>() { // todo 4
                    @Override
                    public ObservableSource<LoginResponse> apply(RegisterResponse registerResponse) throws Exception {
                        Observable<LoginResponse> loginResponseObservable = MyRetrofit.createRetrofit().create(IReqeustNetwor.class)
                                .loginAction(new LoginReqeust());
                        return loginResponseObservable;
                    }
                })
                .observeOn(AndroidSchedulers.mainThread()) // Execute the main thread below
                .subscribe(new Observer<LoginResponse>() {

                    // It must be the main thread, why, because subscribe immediately call onSubscribe
                    @Override
                    public void onSubscribe(Disposable d) {
                        // TODO 1
                        progressDialog = new ProgressDialog(RequestActivity.this);
                        progressDialog.show();

                        / / UI operations

                        disposable = d;
                    }

                    @Override
                    public void onNext(LoginResponse loginResponse) { // todo 5
                        // TODO 4. After login, update the login UI
                    }

                    @Override
                    public void onError(Throwable e) {}// todo 6
                    @Override
                    public void onComplete(a) {
                        / / editing
                        if(progressDialog ! =null) { progressDialog.dismiss(); }}}); }@Override
    protected void onDestroy(a) {
        super.onDestroy();
        // It must be written like this, the minimum standard
        if(disposable ! =null)
            if(! disposable.isDisposed()) disposable.dispose(); }Copy the code