Android advanced series third party library knowledge arrangement.

Knowledge summary, sorting is also the process of learning, such as mistakes, welcome criticism pointed out.

Rxjava2(a), basic concepts and use

Rxjava2 extends to the Observer schema as described in the basic concepts in the previous article. In the previous article, we simply introduced the use of Observable to create observer schema. In fact, RxJava2 provides us with five ways to create observer schema.

Observable and Observer

The ability to emit 0 or N data and terminate with a success or error event was illustrated in part 1 and will not be explained in detail here.

2. Flowable and Subscriber

Can emit 0 or N data and terminate with a success or error event. Support back pressure, can control the data source emission speed.

The difference between Observable and Flowable is that the latter supports back pressure. What is back pressure?

2.1. What is back pressure

Back pressure is a phenomenon, which simply means that in asynchronous operation, the upstream sends data faster than the downstream processes data, and the downstream does not have enough time to process it. As a result, Buffer overflow leads to event blocking and various problems, such as event loss and OOM.

Does not support the back pressure in the rxjava1, when blocking in events, will directly thrown MissingBackpressureException exception, but in the rxjava2 provided Flowable to create the observer, dealing with back pressure by Flowable, We can do a simple demo analysis.

A: We send data in an upstream simulation loop.

B: Thread switching, asynchronous operation.

C: The downstream obtains data every second.

We create an Observable to simulate the phenomenon of back pressure. We simulate sending data in an infinite loop upstream, and the downstream gets data after sleeping for a second each time. This will definitely cause the problem we mentioned earlier, that is, upstream sending is too fast and the downstream can’t handle it.

Look at the log, the print result stayed at 13 and did not continue to print? Can see the program has collapsed at the same time, because in rxjava2 observables does not support the back pressure, back pressure problems, it is not an error, also won’t throw MissingBackpressureException exceptions, but memory will always high, resulting in insufficient memory program directly to hang up.

Rxjava2 uses Flowable to handle this problem.

From shallow to deep, slowly uncover the mystery of Flowable.

Let’s create a basic demo with Flowable:

       Flowable.create(new FlowableOnSubscribe<String>() {

            @Override

            public void subscribe(FlowableEmitter<String> emitter) throws Exception {

                emitter.onNext("Event One");

                LogUtil.d(TAG + "-- SUBSCRIBE send event one");

                emitter.onNext("Event 2");

                LogUtil.d(TAG + "-- SUBSCRIBE send event 2");

                emitter.onNext("Event 3");

                LogUtil.d(TAG + "-- SUBSCRIBE send event 3");

                emitter.onNext("Event 4");

                LogUtil.d(TAG + "-- SUBSCRIBE send event 4");

                emitter.onComplete();

                LogUtil.d(TAG + "-- SUBSCRIBE completed");

            }

        }, BackpressureStrategy.ERROR) // We need to pass in the backpressure policy, similar to the saturation policy in the thread pool, when the cache is full

                .subscribeOn(Schedulers.io())

                .observeOn(AndroidSchedulers.mainThread()) // Thread switch, asynchronous operation

                .subscribe(new Subscriber<String>() {

                    @Override

                    public void onSubscribe(Subscription s) {

                        LogUtil.d(TAG + "--onSubscribe");

                        // Determine how many events the observer can receive, and put the extra events into the cache

                        // The default buffer size of Flowable is 128, that is, it can hold up to 128 events

                        s.request(3);

                    }



                    @Override

                    public void onNext(String s) {

                        LogUtil.d(TAG + "--onNext receives :" + s);

                    }



                    @Override

                    public void onError(Throwable t) {

                        LogUtil.d(TAG + "--onError error=" + t.getLocalizedMessage());

                    }



                    @Override

                    public void onComplete(a) {

                        LogUtil.d(TAG + "--onComplete");

                    }

                });

Copy the code

Can see Flowable creation and observables basic about the same, just in the create method was introduced into BackpressureStrategy. ERROR so a back pressure strategy, this will be behind.

In the onSubscribe callback, the Subscription argument allows the observer to set the number of events to receive. If the number of events sent is greater than the number received by the observer, the additional events are stored in the Flowable cache.

The queue size of the Flowable buffer can hold only 128 events. If the queue size exceeds 128 events, an exception will be reported.

Results:

Four events are sent, and the observer is set by Subscription. Request (3) to receive only three events, so only three are received downstream, leaving one to be put into the Flowable cache.

If our observer does not set the Subscription. Request (x), that is, does not receive the event, the observed will still send the event and store it in the cache. The observer can dynamically call the Subscription.

        Flowable.create(new FlowableOnSubscribe<String>() {

            @Override

            public void subscribe(FlowableEmitter<String> emitter) throws Exception {



                for (int x = 0; x <= 10; x++) {

                    LogUtil.d(TAG + "-- SUBSCRIBE sent" + x + "An event");

                    emitter.onNext(x + "Event");

                }

            }

        }, BackpressureStrategy.ERROR) 

                // Thread switch, asynchronous operation

                .subscribeOn(Schedulers.io())

                .observeOn(AndroidSchedulers.mainThread())

                .subscribe(new Subscriber<String>() {

                    @Override

                    public void onSubscribe(Subscription s) {

                        LogUtil.d(TAG + "--onSubscribe");

                        subscription = s;

                        // s.request(3); The number of events received by the observer is not specified

                    }



                    @Override

                    public void onNext(String s) {

                        LogUtil.d(TAG + "--onNext receives :" + s);

                    }



                    @Override

                    public void onError(Throwable t) {

                        LogUtil.d(TAG + "--onError error=" + t.getLocalizedMessage());

                    }



                    @Override

                    public void onComplete(a) {

                        LogUtil.d(TAG + "--onComplete");

                    }

                });

Copy the code

Dynamic access

findViewById(R.id.bt_get_event).setOnClickListener(new View.OnClickListener() {

            @Override

            public void onClick(View v) {

                if(subscription ! =null) {

                    LogUtil.d(TAG + "--onClick");

                    subscription.request(4);

                }

            }

        });

Copy the code

You can see that our observer didn’t specify how many events to receive at first, but we dynamically set the number of events to receive by external click events, and we see the result, when the click is triggered, we receive the first four events to be queued.

Results:

2.2. Back pressure strategy

As mentioned earlier, Flowable’s default cache queue size is 128, that is, it can only store 128 events sent upstream. If the number of events sent upstream exceeds 128, we need to specify corresponding backpressure strategies for different processing. BackpressureStrategy provides five backpressure strategies for us.

Sorted as follows:

strategy role
MISSING When the cache size is full (128) and the observed continues to send the next event, an exception is thrownMissingBackpressureException, indicating that the cache is full
ERROR An exception is thrown when the observed continues to send the next event when the cache size is full (128) (the default cache size is 128)MissingBackpressureException
BUFFER When the cache size is full (128) and the observed continues to send the next event, the cache size is set to infinite, that is, the observed can send events indefinitely, but is actually stored in the cache
DROP When the cache size is full and the observer continues to send the next event, all events exceeding the cache size (128) are discarded
LATEST When the cache size is full and the observer continues to send the next event, only the latest/last sent event is saved, and all other events exceeding the cache size (128) are discarded

2.2.1, MISSING

When the buffer size to fill (128), the observed continues to send the next event, throw an exception MissingBackpressureException, prompt buffer is full

        Flowable.create(new FlowableOnSubscribe<String>() {

            @Override

            public void subscribe(FlowableEmitter<String> emitter) throws Exception {

                // Send 129 events to simulate outrunning the cache

                for (int x = 0; x < 129; x++) {

                    emitter.onNext(x + "Event");

                    LogUtil.d(TAG + "-- SUBSCRIBE sent" + x + "An event");

                }

            }

        }, BackpressureStrategy.MISSING) / / use BackpressureStrategy. MISSING back pressure tactics

                // Thread switch, asynchronous operation

                .subscribeOn(Schedulers.io())

                .observeOn(AndroidSchedulers.mainThread())

                .subscribe(new Subscriber<String>() {

                    @Override

                    public void onSubscribe(Subscription s) {

                        LogUtil.d(TAG + "--onSubscribe");

                        s.request(Integer.MAX_VALUE);

                    }



                    @Override

                    public void onNext(String s) {

                        LogUtil.d(TAG + "--onNext receives :" + s);

                    }



                    @Override

                    public void onError(Throwable t) {

                        LogUtil.d(TAG + "--onError error=" + t);

                    }



                    @Override

                    public void onComplete(a) {

                        LogUtil.d(TAG + "--onComplete");

                    }

                });

Copy the code

We use BackpressureStrategy. MISSING back pressure strategy, observer receives the request (Integer. MAX_VALUE), the values are recommended.

Results:

We see, when sending the 128 incident, again send 129th event time, throw the MissingBackpressureException abnormalities, and we have not set the observer receiving receives the data, that is first deposited in the buffer queue, again to send, after the exception thrown in buffer, The onNext() event is stopped, and we can verify that when set the observer sends the 128 event.

        Flowable.create(new FlowableOnSubscribe<String>() {

            @Override

            public void subscribe(FlowableEmitter<String> emitter) throws Exception {

                ******* sends 128 events ********

                for (int x = 0; x < 128; x++) {

                    emitter.onNext(x + "Event");

                    LogUtil.d(TAG + "-- SUBSCRIBE sent" + x + "An event");

                }

            }

        }, BackpressureStrategy.MISSING)

                // Thread switch, asynchronous operation

                .subscribeOn(Schedulers.io())

                .observeOn(AndroidSchedulers.mainThread())

                .subscribe(new Subscriber<String>() {

                    @Override

                    public void onSubscribe(Subscription s) {

                        LogUtil.d(TAG + "--onSubscribe");

                        s.request(Integer.MAX_VALUE);

                    }



                    @Override

                    public void onNext(String s) {

                        LogUtil.d(TAG + "--onNext receives :" + s);

                    }



                    @Override

                    public void onError(Throwable t) {

                        LogUtil.d(TAG + "--onError error=" + t);

                    }



                    @Override

                    public void onComplete(a) {

                        LogUtil.d(TAG + "--onComplete");

                    }

                });

Copy the code

On the basis of the demo above, we changed the number of events sent. 128 events are sent upstream, just the size of the cache, and no exceptions are thrown.

Results:

We see that the program does not throw an exception and prints 128 entries (starting from 0) in the cache, which confirms two points

1. The cache size is indeed 128

2, first store in the cache and then get (if abnormal, onNext directly not called)

2.2.2, ERROR

When the buffer is full size (128) (128) the default buffer size, observed continues to send the next event, exceptions MissingBackpressureException directly

        Flowable.create(new FlowableOnSubscribe<String>() {

            @Override

            public void subscribe(FlowableEmitter<String> emitter) throws Exception {

                // Send 129 events to simulate outrunning the cache

                for (int x = 0; x < 129; x++) {

                    emitter.onNext(x + "Event");

                    LogUtil.d(TAG + "-- SUBSCRIBE sent" + x + "An event");

                }

            }

        }, BackpressureStrategy.ERROR)

                // Thread switch, asynchronous operation

                .subscribeOn(Schedulers.io())

                .observeOn(AndroidSchedulers.mainThread())

                .subscribe(new Subscriber<String>() {

                    @Override

                    public void onSubscribe(Subscription s) {

                        LogUtil.d(TAG + "--onSubscribe");

                        s.request(Integer.MAX_VALUE);

                    }



                    @Override

                    public void onNext(String s) {

                        LogUtil.d(TAG + "--onNext receives :" + s);

                    }



                    @Override

                    public void onError(Throwable t) {

                        LogUtil.d(TAG + "--onError error=" + t);

                    }



                    @Override

                    public void onComplete(a) {

                        LogUtil.d(TAG + "--onComplete");

                    }

                });

Copy the code

Using BackpressureStrategy. ERROR back pressure strategy

Results:

Like Missing, directly behind the MissingBackpressureException abnormal and downstream did not receive the data, by the same token, if the upstream sending data is less than or equal to 128, normal send and receive.

Then, the BUFFER

When the cache size is full (128) and the observed continues to send the next event, the cache size is set to infinite, that is, the observed can send events indefinitely, but is actually stored in the cache.

       Flowable.create(new FlowableOnSubscribe<String>() {

            @Override

            public void subscribe(FlowableEmitter<String> emitter) throws Exception {

                // Send 129 events to simulate outrunning the cache

                for (int x = 0; x < 129; x++) {

                    emitter.onNext(x + "Event");

                    LogUtil.d(TAG + "-- SUBSCRIBE sent" + x + "An event");

                }

            }

        }, BackpressureStrategy.BUFFER)

                // Thread switch, asynchronous operation

                .subscribeOn(Schedulers.io())

                .observeOn(AndroidSchedulers.mainThread())

                .subscribe(new Subscriber<String>() {

                    @Override

                    public void onSubscribe(Subscription s) {

                        LogUtil.d(TAG + "--onSubscribe");

                        s.request(Integer.MAX_VALUE);

                    }



                    @Override

                    public void onNext(String s) {

                        LogUtil.d(TAG + "--onNext receives :" + s);

                    }



                    @Override

                    public void onError(Throwable t) {

                        LogUtil.d(TAG + "--onError error=" + t);

                    }



                    @Override

                    public void onComplete(a) {

                        LogUtil.d(TAG + "--onComplete");

                    }

                });

Copy the code

Using BackpressureStrategy. BUFFER back-pressure strategy

Change the cache size without limitation.

Results:

As you can see, all 129 events we sent were sent and received.

2.2.4, DROP

When the cache size is full and the observer continues to send the next event, all events exceeding the cache size (128) are discarded

        Flowable.create(new FlowableOnSubscribe<String>() {

            @Override

            public void subscribe(FlowableEmitter<String> emitter) throws Exception {

                // Send 129 events to simulate outrunning the cache

                for (int x = 0; x < 129; x++) {

                    emitter.onNext(x + "Event");

                    LogUtil.d(TAG + "-- SUBSCRIBE sent" + x + "An event");

                }

            }

        }, BackpressureStrategy.DROP)

                // Thread switch, asynchronous operation

                .subscribeOn(Schedulers.io())

                .observeOn(AndroidSchedulers.mainThread())

                .subscribe(new Subscriber<String>() {

                    @Override

                    public void onSubscribe(Subscription s) {

                        LogUtil.d(TAG + "--onSubscribe");

                        s.request(Integer.MAX_VALUE);

                    }



                    @Override

                    public void onNext(String s) {

                        LogUtil.d(TAG + "--onNext receives :" + s);

                    }



                    @Override

                    public void onError(Throwable t) {

                        LogUtil.d(TAG + "--onError error=" + t);

                    }



                    @Override

                    public void onComplete(a) {

                        LogUtil.d(TAG + "--onComplete");

                    }

                });

Copy the code

Using BackpressureStrategy. DROP back pressure tactics

Discard events larger than the cache.

Results:

The result is clear: no exception is thrown and it prints normally, but the out-of-cache event is discarded and not retrieved.

2.2.5, LATEST

When the cache size is full and the observer continues to send the next event, only the latest/last sent event is saved, and all other events exceeding the cache size (128) are discarded

        Flowable.create(new FlowableOnSubscribe<String>() {

            @Override

            public void subscribe(FlowableEmitter<String> emitter) throws Exception {

                // Send 150 events

                for (int x = 0; x < 150; x++) {

                    emitter.onNext(x + "Event");

                    LogUtil.d(TAG + "-- SUBSCRIBE sent" + x + "An event");

                }

            }

        }, BackpressureStrategy.LATEST)

                // Thread switch, asynchronous operation

                .subscribeOn(Schedulers.io())

                .observeOn(AndroidSchedulers.mainThread())

                .subscribe(new Subscriber<String>() {

                    @Override

                    public void onSubscribe(Subscription s) {

                        LogUtil.d(TAG + "--onSubscribe");

                        s.request(Integer.MAX_VALUE);

                    }



                    @Override

                    public void onNext(String s) {

                        LogUtil.d(TAG + "--onNext receives :" + s);

                    }



                    @Override

                    public void onError(Throwable t) {

                        LogUtil.d(TAG + "--onError error=" + t);

                    }



                    @Override

                    public void onComplete(a) {

                        LogUtil.d(TAG + "--onComplete");

                    }

                });

Copy the code

Using BackpressureStrategy. LATEST back-pressure strategy

150 events were sent

When the number exceeds 128, the latest event is saved, which means 129 events are received.

Results:

We can see that the observer side receives 129 data from the cache, plus the latest/last data, and the intermediate data is discarded.

2.3. Flowable in case of synchronization

As mentioned earlier, the premise of back pressure is asynchronous operation. In synchronous operation, we do not have back pressure, because the same thread sends data always wait for downstream processing before sending the second data, there is no buffer, as follows:

       Flowable.create(new FlowableOnSubscribe<String>() {

            @Override

            public void subscribe(FlowableEmitter<String> emitter) throws Exception {

                LogUtil.d(TAG + "-- SUBSCRIBE send event one");

                emitter.onNext("Event One");

                LogUtil.d(TAG + "-- SUBSCRIBE send event 2");

                emitter.onNext("Event 2");

                LogUtil.d(TAG + "-- SUBSCRIBE send event 3");

                emitter.onNext("Event 3");

                LogUtil.d(TAG + "-- SUBSCRIBE completed");

                emitter.onComplete();

            }

        }, BackpressureStrategy.ERROR).subscribe(new Subscriber<String>() {

            @Override

            public void onSubscribe(Subscription s) {

                LogUtil.d(TAG + "--onSubscribe");

                s.request(3);

            }



            @Override

            public void onNext(String s) {

                LogUtil.d(TAG + "--onNext receives :" + s);

            }



            @Override

            public void onError(Throwable t) {

                LogUtil.d(TAG + "--onError error=" + t);

            }



            @Override

            public void onComplete(a) {

                LogUtil.d(TAG + "--onComplete");

            }

        });

Copy the code

Results:

As you can see, events are executed sequentially, sending one, receiving one, and then executing the next.

But, we might have a situation where upstream sends four pieces of data and downstream receives only three, right? Let’s change the demo as follows:

       Flowable.create(new FlowableOnSubscribe<String>() {

            @Override

            public void subscribe(FlowableEmitter<String> emitter) throws Exception {

                LogUtil.d(TAG + "-- SUBSCRIBE send event one");

                emitter.onNext("Event One");

                LogUtil.d(TAG + "-- SUBSCRIBE send event 2");

                emitter.onNext("Event 2");

                LogUtil.d(TAG + "-- SUBSCRIBE send event 3");

                emitter.onNext("Event 3");

                LogUtil.d(TAG + "-- SUBSCRIBE send event 4");

                emitter.onNext("Event 4");

                LogUtil.d(TAG + "-- SUBSCRIBE completed");

                emitter.onComplete();

            }

        }, BackpressureStrategy.ERROR).subscribe(new Subscriber<String>() {

            @Override

            public void onSubscribe(Subscription s) {

                LogUtil.d(TAG + "--onSubscribe");

                s.request(3);



            }



            @Override

            public void onNext(String s) {

                LogUtil.d(TAG + "--onNext receives :" + s);

            }



            @Override

            public void onError(Throwable t) {

                LogUtil.d(TAG + "--onError error=" + t);

            }



            @Override

            public void onComplete(a) {

                LogUtil.d(TAG + "--onComplete");

            }

        });

Copy the code

As you can see, the observed sent four events, but the observer received only three.

Results:

As you can see, the same cast MissingBackpressureException anomalies

The back pressure strategy of BUFFER can be used to deal with this, but in order to illustrate that the observer reversely controls the observed, we adopt the following scheme:

        Flowable.create(new FlowableOnSubscribe<String>() {

            @Override

            public void subscribe(FlowableEmitter<String> emitter) throws Exception {

                Emitter. Requested () gets the number of events that the observer sets to receive

                long requested = emitter.requested();

                LogUtil.d(TAG + "--subscribe Observer sets the number of events received :" + requested);



                for (int x = 0; x < requested; x++) {

                    LogUtil.d(TAG + "-- SUBSCRIBE send event" + x);

                    emitter.onNext("Send event" + x);

                }

                LogUtil.d(TAG + "-- SUBSCRIBE completed");

                emitter.onComplete();

            }

        }, BackpressureStrategy.BUFFER).subscribe(new Subscriber<String>() {

            @Override

            public void onSubscribe(Subscription s) {

                LogUtil.d(TAG + "--onSubscribe");

                // Set the number of events the observer receives to 3

                s.request(3);



            }



            @Override

            public void onNext(String s) {

                LogUtil.d(TAG + "--onNext receives :" + s);

            }



            @Override

            public void onError(Throwable t) {

                LogUtil.e(TAG + "--onError error=" + t);

            }



            @Override

            public void onComplete(a) {

                LogUtil.d(TAG + "--onComplete");

            }

        });

Copy the code

In SUBSCRIBE, we obtain the number of received events set in the observer from Emitter. Requested () to dynamically send data, so as to avoid the problem of data synchronization between upstream and downstream.

Results:

2.4. Back pressure processing when using operators

We are created by the create Flowable, can in the create a second parameter into the appropriate back pressure strategy, Flowable all operators support back pressure, but by the operator to create the back pressure of strategy for BackpressureStrategy. By default the ERROR, We can get through

  • onBackpressureBuffer()
  • onBackpressureDrop()
  • onBackpressureLatest()

There are three ways to specify the corresponding back pressure strategy.

        Flowable.interval(1, TimeUnit.MILLISECONDS)

                .observeOn(Schedulers.io())

                .subscribe(new Subscriber<Long>() {

                    @Override

                    public void onSubscribe(Subscription s) {

                        Log.d(TAG, "onSubscribe");

                        subscription = s;

                        s.request(Long.MAX_VALUE); // Long.MAX_VALUE events can be received by default

                    }



                    @Override

                    public void onNext(Long aLong) {

                        LogUtil.i(TAG + "--onNext aLong=" + aLong);

                        try {

                            // Delay one second for receiving

                            Thread.sleep(1000);

                        } catch (InterruptedException e) {

                            e.printStackTrace();

                        }

                    }



                    @Override

                    public void onError(Throwable t) {

                        LogUtil.e(TAG + "--onError error=" + t);

                    }



                    @Override

                    public void onComplete(a) {

                        LogUtil.i(TAG + "--onComplete");

                    }

                });

Copy the code

Flowable is created by interval. It can be seen that the downstream sends one data every millisecond, and the downstream processes one data every second. The upstream is obviously faster than the downstream, and the data that cannot be processed is put into the cache pool. Because of its default BackpressureStrategy of the strategy of back pressure. The ERROR

Results:

We can specify the corresponding back pressure strategy through onBackpressureXXX.

Results:

When we specify BUFFER as the back pressure policy, we can see that no exceptions are thrown and the program keeps printing out.

Single and SingleObserver

Only single data or error events are emitted.

        Single.create(new SingleOnSubscribe<String>() {

            @Override

            public void subscribe(SingleEmitter<String> emitter) throws Exception {

                // Only onSuccess or onError can be sent

                emitter.onSuccess("Success");

                emitter.onError(new NullPointerException(""));

            }

        }).subscribe(new SingleObserver<String>() {

            @Override

            public void onSubscribe(Disposable d) {

                LogUtil.d(TAG + "--onSubscribe");

            }



            @Override

            public void onSuccess(String s) {

                LogUtil.d(TAG + "--onSuccess s=" + s);

            }



            @Override

            public void onError(Throwable e) {

                LogUtil.e(TAG + "--onError error=" + e.getMessage());

            }

        });

Copy the code

SingleEmitter can send only one onSuccess or onError data. If the emitter sends multiple data, the observer can receive only one data.

Results:

4. Completable and CompletableObserver

No data is emitted, only onComplete and onError events are handled.

Only one of the onComplete and onError methods can be called at the same time. The first method takes effect.

5. Maybe and MaybeObserver

To be able to send zero or one data, either success or failure. Kind of like Optional.

The onSuccess method can only send a subscription once.

Only one of the onComplete and onError methods can be called at the same time. The first method takes effect.