Outline

[TOC]

preface

Last month RxJava2 official release, but the current domestic information is still relatively few, used RxJava1 just need to see the update document to know how to use, but there are some before did not use RxJava1 friends may not know how to do, do not know the RxJava1 or directly jump to RxJava2. So I wrote this tutorial to help those of you who have never used RxJava get started.

Note: If you think it is not well written, please criticize it directly.

Let me answer this question first: Does learning RxJava2 require learning RxJava1 first?

This question is like the forum often asked to learn Java before learning C language, here will not lead to war! PHP is the best language in the world!!

The answer is obviously not, if you have learned RxJava1 before, then for RxJava2 just need to see what is updated on the line, its core idea is not changed, if you have not learned RxJava1, it does not matter, directly learn RxJava2. So as an RxJava2 tutorial, all nouns in this article belong to RxJava2 and do not refer to RxJava1.

To use RxJava2 in Android, add Gradle configuration:

    compile 'the IO. Reactivex. Rxjava2: rxjava: 2.0.1'
    compile 'the IO. Reactivex. Rxjava2: rxandroid: 2.0.1'Copy the code

To the chase

Before we get started, let’s talk about the basics.

Also has a lot of this paper introduces the principle of RxJava articles online, usually these articles are from the observer pattern, first the observer, is the observer, subscribe to the relationship between yapping a lot of, to be honest, when I first saw these articles have been around these nouns to dizzy, used for a long time to analyze the relationship between them. Perhaps I am too stupid to understand so many lofty terms.

Today I use two water pipes instead of observer and observed to try to explain their relationship in plain English. Here I will explain the basic working principle of RxJava from the perspective of event flow.

Suppose there are two water pipes:

RxJava

The top pipe is the pipe that produces the event and is called upstream, and the bottom pipe is the pipe that receives the event and is called downstream.

The two hoses are connected in such a way that each upstream event is received downstream. Notice that the event diagram here is reversed from the official website. Here, events are sent in the order of first, second and third, and events are received in the order of first, second and third. I think this is more in line with the thinking of ordinary people, simple and clear.

The upstream and downstream correspond to observables and observers in RxJava, respectively. The connection between them corresponds to subscribe().

        // Create an upstream Observable:
        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3); emitter.onComplete(); }});// Create a downstream Observer
        Observer<Integer> observer = new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "subscribe");
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "" + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "error");
            }

            @Override
            public void onComplete(a) {
                Log.d(TAG, "complete"); }};// Establish a connection
        observable.subscribe(observer);Copy the code

The result of this run is:

12-02 03:37:17.818 4166-4166/zlc.season.rxjava2demo D/TAG: subscribe
12-02 03:37:17.819 4166-4166/zlc.season.rxjava2demo D/TAG: 1
12-02 03:37:17.819 4166-4166/zlc.season.rxjava2demo D/TAG: 2
12-02 03:37:17.819 4166-4166/zlc.season.rxjava2demo D/TAG: 3
12-02 03:37:17.819 4166-4166/zlc.season.rxjava2demo D/TAG: completeCopy the code

Note: Upstream does not start sending events until a connection is established between upstream and downstream. That is, the event is sent after the subscribe() method is called.

To write this code together, RxJava prides itself on chaining:

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "subscribe");
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "" + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "error");
            }

            @Override
            public void onComplete(a) {
                Log.d(TAG, "complete"); }});Copy the code

Let’s explain two of these strange things: ObservableEmitter and Disposable.

ObservableEmitter: Emitter means Emitter, so it’s easy to guess, and this one emits events, and it emits three types of events, Emitter onNext(T value), onComplete(), and onError(Throwable Error) can emit next, complete, and error events, respectively.

However, please note that this does not mean that you can randomly launch random events, there are certain rules that need to be met:

  • Upstream can send an infinite number of onNext, and downstream can receive an infinite number of onNext.
  • When an onComplete is sent upstream, the events that follow the upstream onComplete willContinue toSend, and the downstream will receive onComplete event afterNo longer continue toReceive events.
  • When an onError is sent upstream, events following the onError upstream willContinue toSend, and the downstream will receive the onError event afterNo longer continue toReceive events.
  • Upstream may not send onComplete or onError.
  • The most important thing is that onComplete and onError must be unique and mutually exclusive, that is, you cannot send multiple oncompletions, you cannot send multiple onErrors, you cannot send one onComplete and then one onError, and vice versa

Note: onComplete and onError are unique and mutually exclusive, but you need to control them in your code. If you break this rule in your code logic, it will not necessarily crash your program. For example, sending multiple onCompletions can work normally, but the program will crash if it receives the second onError event after receiving the first onComplete.

The above rules are shown as follows:

Schematic diagram
Only the onNext event is sent
next
Send the onComplete event
complete
Sends the onError event
error

ObservableEmitter, and then Disposable. How to understand it in RxJava, corresponding to the water pipe example above, we can understand it as a mechanism between two pipes, when calling its Dispose () method, it will cut off the two pipes, resulting in the downstream receive no event.

Note: Calling Dispose () does not cause the upstream to stop sending events, the upstream will continue to send the remaining events.

For example, we ask the upstream to send 1,2,3,complete, and 4 in sequence. After the downstream receives the second event, we cut off the water pipe and see the result:

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "emit 1");
                emitter.onNext(1);
                Log.d(TAG, "emit 2");
                emitter.onNext(2);
                Log.d(TAG, "emit 3");
                emitter.onNext(3);
                Log.d(TAG, "emit complete");
                emitter.onComplete();
                Log.d(TAG, "emit 4");
                emitter.onNext(4);
            }
        }).subscribe(new Observer<Integer>() {
            private Disposable mDisposable;
            private int i;

            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "subscribe");
                mDisposable = d;
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "onNext: " + value);
                i++;
                if (i == 2) {
                    Log.d(TAG, "dispose");
                    mDisposable.dispose();
                    Log.d(TAG, "isDisposed : "+ mDisposable.isDisposed()); }}@Override
            public void onError(Throwable e) {
                Log.d(TAG, "error");
            }

            @Override
            public void onComplete(a) {
                Log.d(TAG, "complete"); }});Copy the code

The running results are as follows:

12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: subscribe
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: emit 1
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: onNext: 1
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: emit 2
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: onNext: 2
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: dispose
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: isDisposed : true
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: emit 3
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: emit complete
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: emit 4Copy the code

From the running result, we can see that after receiving the event onNext 2, the water pipe was cut off, but the upstream still sent events 3, complete and 4, and the upstream did not stop because of sending onComplete. You can also see that the downstream onSubscribe() method is called first.

There’s more to it than that, and we’ll see how important it is when we talk about thread scheduling. We’ll see it in more places as we go further.

In addition, subscribe() has multiple overloaded methods:

    public final Disposable subscribe(a) {}
    public final Disposable subscribe(Consumer<? super T> onNext) {}
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {} 
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
    public final void subscribe(Observer<? super T> observer) {}Copy the code

The last one, with an Observer parameter, has already been used, and several other methods are illustrated here.

  • Without any parameterssubscribe()The downstream does not care about any event, you upstream just send your data to go, I don’t care what you send.
  • With aConsumer< onNext > < onNext > < onNext > < onNext > < onNext >
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "emit 1");
                emitter.onNext(1);
                Log.d(TAG, "emit 2");
                emitter.onNext(2);
                Log.d(TAG, "emit 3");
                emitter.onNext(3);
                Log.d(TAG, "emit complete");
                emitter.onComplete();
                Log.d(TAG, "emit 4");
                emitter.onNext(4);
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "onNext: "+ integer); }});Copy the code
  • The same goes for the other methods, which are not explained here.

That concludes the tutorial, and in the next section we’ll learn about RxJava’s powerful thread scheduling.