Android advanced series third party library knowledge arrangement.

Knowledge summary, sorting is also the process of learning, such as mistakes, welcome criticism pointed out.

At the time of writing this article, Rxjava3 has been released, the two are not very different, so I will summarize based on Rxjava2, and I will spend some time to summarize some changes of Rxjava3.

Basic concept

1. What is Rx

ReactiveX is an extension of LINQ. Rx is an open-source programming model developed by Microsoft and aims to provide a consistent programming interface to help developers work with asynchronous data streams. The Rx library supports a wide range of languages. Rxjava is a function library provided by Rx library based on Java language.

The official website displays a list of supported languages:

Official Rx supported languages

2. Observer mode

The Observer mode, also known as Publish/Subscribe mode, defines a one-to-many dependency between objects, so that every time an object changes state, all dependent objects will be notified and automatically updated. The Observer mode belongs to the behavior mode.

I need to call Xiao Ming to tell him that he was cuckolded, but I was afraid that he might be busy when I called him, so I sent him a message directly and asked him to call me back when he had time. In this way, when Xiao Ming had time, he might call me back, and I told him to be strong with tears in my eyes.

In the above example, Xiao Ming is the observed, and I am the observer. I send a text message to tell him to call me back when he has time. This operation is subscription, so THAT I have a dependent relationship with him.

Reactive Programming

Generally speaking, programming is to deal with problems, solving problems can have different perspectives and ideas, which have universal mode will come down to a programming thought, we often say object-oriented, process-oriented are a kind of programming thought.

Reactive programming is a programming pattern that builds transactional relationships through asynchrony and data flow

There are two core concepts involved:

Asynchrony: Returns immediately without waiting for the processing to complete, and returns with a callback. Improves throughput, performance and efficiency.

Data stream: Anything can be considered a stream in terms of a sequence of events in a timeline. On this basis, these streams are filtered, selected, transformed and combined by functions. An event handler is defined to asynchronously capture streams of interest, that is, to listen to (subscribe to) streams, or register observers.

Then compare the observer pattern in front of us, in the observer pattern between the observer and observed by subscribing to produce dependence, subscribe to the relationship is equivalent to before they dug a canal, observed is the upstream and downstream, the observer is the observed changes, the data flow continuously sent to the observer dependencies, Usually the observer and observed are in the same thread, but reactive programming focuses on asynchronism, meaning that upstream and downstream can work in different threads.

We can take a look at the GIF on the website:

4, Rxjava concept and simple use

Rxjava adopts the responsive programming mode, which is extended on the basis of the observer mode to facilitate our asynchronous operation. After the observer and the observed have a subscription relationship, when the observed sends the data stream to the observer, Rxjava provides rich operators to filter, select, transform, combine and so on on the data stream, and then send the processed data to the observer. Thanks to its excellent chain calls, Rxjava allows logic in very complex cases. It can also be expressed very clearly in code.

Let’s start with RXJava

implementation "IO. Reactivex. Rxjava2: rxjava: 2.2.8"

implementation 'the IO. Reactivex. Rxjava2: rxandroid: 2.1.1'

Copy the code

Let’s start with a simple example:

  // Create an Observable with an Observable

  Observable mObservable = Observable.create(new ObservableOnSubscribe() {

         @Override

         public void subscribe(ObservableEmitter emitter) throws Exception {

                emitter.onNext(1);

                emitter.onNext(2);

                emitter.onNext(3);

                emitter.onComplete();

            }

        });



        // Create an Observer through the Observer

        Observer mObserver = new Observer<Integer>() {

            @Override

            public void onSubscribe(Disposable d) {

                LogUtil.d(TAG + "--onSubscribe");

            }



            @Override

            public void onNext(Integer o) {

                LogUtil.d(TAG + "--onNext o=" + o);

            }



            @Override

            public void onError(Throwable e) {

                LogUtil.d(TAG + "--onError");

            }



            @Override

            public void onComplete(a) {

                LogUtil.d(TAG + "--onComplete");

            }

        };

        // Generate a subscription

        mObservable.subscribe(mObserver);

Copy the code

Take a look at Rxjava’s powerful chain calls:

Observable.create(new ObservableOnSubscribe() {

       @Override

       public void subscribe(ObservableEmitter emitter) throws Exception {

                emitter.onNext(1);

                emitter.onNext(2);

                emitter.onNext(3);

                emitter.onComplete();

            }

            // Subscribe directly

        }).subscribe(new Observer<Integer>() {

            @Override

            public void onSubscribe(Disposable d) {

                LogUtil.d(TAG + "--Observer--onSubscribe");

            }



            @Override

            public void onNext(Integer o) {

                LogUtil.d(TAG + "--Observer--onNext o=" + o);

            }



            @Override

            public void onError(Throwable e) {

                LogUtil.d(TAG + "--Observer--onError");

            }



            @Override

            public void onComplete(a) {

                LogUtil.d(TAG + "--Observer--onComplete");

            }

        });

Copy the code

As we said earlier, RxJava is responsive programming that extends the Observer pattern, so in RxJava you have the observer, the observed, and their dependencies through subscriptions.

From the above code, you can see:

Observed ——————> Observable

Observer ——————> Observer

Observable ————> subscribe <———— Observer

An Observable upstream sends data streams to an Observer downstream using an Emitter. We fire 1, 2, 3 and an onComplete event upstream.

As we can see from the log, the downstream Observer successfully received the numbers 1, 2, and 3 sent upstream including the onComplete event sent upstream by calling onComplete().

From the simple example above, we can conclude the following points

4.1, decoupling

We can see that both the observed and the observer are independent.

Observable:

  // Create an Observable with an Observable

  Observable mObservable = Observable.create(new ObservableOnSubscribe() {

         @Override

         public void subscribe(ObservableEmitter emitter) throws Exception {

                emitter.onNext(1);

                emitter.onNext(2);

                emitter.onNext(3);

                emitter.onComplete();

            }

        });

Copy the code

A: I’m an Observer.

 // Create an Observer through the Observer

        Observer mObserver = new Observer<Integer>() {

            @Override

            public void onSubscribe(Disposable d) {

                LogUtil.d(TAG + "--onSubscribe");

            }



            @Override

            public void onNext(Integer o) {

                LogUtil.d(TAG + "--onNext o=" + o);

            }



            @Override

            public void onError(Throwable e) {

                LogUtil.d(TAG + "--onError");

            }



            @Override

            public void onComplete(a) {

                LogUtil.d(TAG + "--onComplete");

            }

        };

Copy the code

As you can see, they both exist independently and do their own things. They just subscribe to build their dependencies. This way, we can reuse code very well. This allows multiple observers to receive streams of data sent by an Observable.

4.2 Data Stream Conversion (Use of operators)

Observables send data to observers in the form of streams from an emitter. As mentioned above, since data is in the form of streams, can we process data before the Observer receives it? The answer is yes, and this is one of the strengths of Rxjava, which can process data through a variety of operators. Let’s start with a simple one.

Here is just a simple demonstration of some Rxjava operations on data stream, and the operators will be discussed in detail later. As can be seen from the picture, the observer clearly sent an int value of 100, but after converting the data stream through the map operator, the downstream consumer actually received a String. This is just a little Demo, but in Rxjava, there are all sorts of powerful operators that can do all sorts of weird things with data.

The upstream Observable emits data, and the downstream Observable uses an Observer to receive data, as well as a Consumer to receive data. Consumer has only one Accept method.

First post a list of Rxjava’s powerful operators, isn’t it a lot? And don’t be scared, the more you have, the easier it is to do all sorts of things with your data stream, and we’ll talk about that separately.

4.3. Thread switching

As mentioned earlier, Rxjava is better for asynchronous operations, so let’s use the above example to print the current thread status.

Results:

You can go to, the united front is all in the main thread, let’s see, how to implement asynchronous.

Using that same example, we added the following two pieces of code

subscribeOn(Schedulers.io())

observeOn(AndroidSchedulers.mainThread())

Copy the code

SubscribeOn: Specifies the thread to execute upstream

ObserveOn: Specifies the observer or consumer thread of execution

Results:

In the code, we specify that upstream runs on the IO thread and downstream runs on the main thread. In two simple lines of code, thread switching is implemented and asynchronous operation is implemented.

Of course, not only can we specify IO threads, Schedulers also provide other threads to choose from.

Let’s briefly look at a Single thread source:

Does that sound familiar? ThreadPoolExecutor (ThreadPoolExecutor, ThreadPoolExecutor, ThreadPoolExecutor, ThreadPoolExecutor, ThreadPoolExecutor, ThreadPoolExecutor, ThreadPoolExecutor, ThreadPoolExecutor, ThreadPoolExecutor, ThreadPoolExecutor, ThreadPoolExecutor)

conclusion

Rxjava things are quite a lot, this first simple walk through the basic concepts of Rxjava and simple use, about Rxjava inside there are other observer mode creation, back pressure, the use of various operators, actual combat and so on SAO operation, the next chapter continues.