preface
-
Rxjava
Because of itsChain call based on event flow, simple logic & easy to useThe characteristics of the deepAndroid
Developer welcome.
Making screenshots
If you are not familiar with RxJava, check out Android: a clear and easy-to-understand introduction to RxJava
- This article mainly explains is
RxJava
In theBack pressure control strategyI hope you like it.
- This series of articles is based on
Rxjava 2.0
- In the coming days, I will continue to publish a series of articles on Rxjava 2.0 in Android, including principles, operators, application scenarios, back pressure, etc. If you are interested, please continue to follow Carson_Ho’s Android development notes!!
Schematic diagram
All code demos in this article are stored at Carson_Ho’s Github address
directory
Schematic diagram
1. The introduction
1.1 background
- There are two subscription relationships between observer and observed: synchronous and asynchronous. Details are as follows:
Schematic diagram
- For asynchronous subscriptions, there is a mismatch between the speed at which the observed sends events and the speed at which the observer receives them
- Send & Receive event rate = number of send & receive events per unit of time
- In most cases, the rate at which the observer sends events is greater than the rate at which the observer receives events
1.2 the problem
- The speed at which the observed sends events is too fast, and the observer cannot receive all the events, so the observer cannot respond/process all the sent events in time, resulting in overflow of cache and event loss & OOM
- For example, click button event: click button 10 times continuously too fast, will only cause the effect of clicking 2 times;
- Explanation: Because the speed of the click is too fast, so the button does not respond
Here’s another example:
- The sending event speed of the observed = 10ms/piece
- The receiving event rate of the observer = 5s/unit
That is, there is a serious mismatch between send and receive events
Observable.create(new ObservableOnSubscribe<Integer>() { // 1. @override Public void subscribe(ObservableEmitter<Integer> Emitter) throws Exception {for (int I = 0; ; I ++) {log.d (TAG, "sent event "+ I); Thread.sleep(10); // Emitters are emitter. OnNext (I); }}}). SubscribeOn (Schedulers. IO ()) / / set the observed in IO thread. ObserveOn (AndroidSchedulers. MainThread ()) / / set the observer in the main thread .subscribe(new Observer<Integer>() { // 2. Override public void onSubscribe(Disposable d) {Log. D (TAG, "subscribe "); } @override public void onNext(Integer value) {try {thread.sleep (5000); Log.d(TAG, "received event "+ value); } catch (InterruptedException e) { e.printStackTrace(); }} @override public void onError(Throwable e) {log. d(TAG, "response to Error "); } @override public void onComplete() {log.d (TAG, "response to Complete event "); }});Copy the code
- The results of
As the speed of the event sent by the observer is greater than the speed of the event received by the observer, there is a velocity mismatch problem, resulting inOOM
Schematic diagram
1.3 Solution
Adopt back pressure strategy.
Next, I will introduce the back pressure strategy.
2. Introduction to back pressure strategy
2.1 define
A strategy for controlling the rate of event flow
2.2 role
In an asynchronous subscription, control the speed at which events are sent and received
Note: The scope of back pressure = asynchronous subscription, i.e. observed & observer in different threads
2.3 Problems solved
It solves the problem that the observer cannot respond/process all the events sent by the observer in time because the speed of the event sent by the observer does not match the speed of the event received by the observer (generally the former is faster than the latter)
2.4 Application Scenarios
- A scenario where the rate at which the observer sends events does not match the rate at which the observer receives events
- The specific scenario depends on the event type, for example, network request. In the specific scenario, there are many network requests to be executed, but the execution speed is not that fast. In this case, the back pressure policy is required to control the network request.
3. Principle of back pressure strategy
- So, RxJava implements the back pressure strategy (
Backpressure
How does that work? - The solutions & ideas are as follows:
Schematic diagram
- The schematic diagram is as follows
Schematic diagram
- with
RxJava1.0
The old implementation of the observed inObservable
contrast
Schematic diagram
- Ok, so in the figure above
RxJava 2.0
In the observer model,Flowable
What is it? It is actuallyRxJava 2.0
It is also the carrier of the implementation of back pressure strategy - Please continue to see the introduction of the next section: concrete implementation of the back pressure strategy –
Flowable
4. Concrete implementation of back pressure strategy: Flowable
In RxJava2.0, Flowable is used to implement the back pressure strategy
Correctly, it should be a “non-blocking back pressure” strategy
4.1 introduce Flowable
- Definition:
RxJava2.0
, the observed (Observable
A new implementation of
Meanwhile, the old Observable implementation of RxJava1.0: Observable remains
- Purpose: To achieve non-blocking back pressure strategy
4.2 characteristics of Flowable
-
Flowable
The characteristics of are as follows
Schematic diagram
- I’m going to post another one
RxJava2.0
与RxJava1.0
Comparison of the observer model
In fact, RxJava2.0 also has a reserved observerble-observer Observer model, just for the sake of comparison
Schematic diagram
4.3 relationship to the observed old implementation Observable in RxJava1.0
- The details are shown below.
Schematic diagram
- So why adopt the new implementation
Flowable
Achieve back pressure without using the oldObservable
? - Main reason: old implementation
Observable
Can’t solve the back pressure problem very well.
Schematic diagram
4.4 Basic use of Flowable
-
Flowable
The basic use is very similarObservable
- Specific as follows
/** * Step 1: Upstream = Flowable. Create (new FlowableOnSubscribe<Integer>() {@override public void subscribe(FlowableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); } }, BackpressureStrategy.ERROR); // We need to pass in the BackpressureStrategy parameter, as described below. Creating an observer = Subscriber */ Subscriber<Integer> Downstream = new Subscriber<Integer>() {@override public void OnSubscribe (Subscription s) {// Subscribe(Disposable, Subscriber) = Subscription // Subscription is used as a Disposable argument, Disposable.dispose() disconnects the connection, same as Subscription.cancel() disconnects the connection: Subscription adds void Request (long n) log.d (TAG, "onSubscribe"); s.request(Long.MAX_VALUE); } @override public void onNext(Integer Integer) {log.d (TAG, "onNext: "+ Integer); } @Override public void onError(Throwable t) { Log.w(TAG, "onError: ", t); } @Override public void onComplete() { Log.d(TAG, "onComplete"); }}; /** * Step 3: create a subscription */ upstream. Subscribe (downstream);Copy the code
Schematic diagram
- More elegant chain calls
// Step 1: Create (new FlowableOnSubscribe<Integer>() {@override public void Subscribe (FlowableEmitter<Integer> Emitter) throws Exception {log. d(TAG, "sending event 1"); emitter.onNext(1); Log.d(TAG, "send event 2"); emitter.onNext(2); Log.d(TAG, "send event 3"); emitter.onNext(3); Log.d(TAG, "send done "); emitter.onComplete(); }}, BackpressureStrategy. ERROR). The subscribe (new Subscriber < Integer > () {/ / step 2: @override public void onSubscribe(Subscription s) {log. d(TAG, "onSubscribe"); s.request(3); } @override public void onNext(Integer Integer) {log. d(TAG, "received event" + Integer); } @Override public void onError(Throwable t) { Log.w(TAG, "onError: ", t); } @Override public void onComplete() { Log.d(TAG, "onComplete"); }});Copy the code
- At this point,
Flowable
The basic use of the explanation - Further uses are explained in conjunction with the implementation of the back pressure strategy
5. Use of back pressure strategy
- In this section, I will combine the principle of the back pressure policy & the use of Flowable, to introduce how to use Flowable to implement the back pressure policy function in RxJava 2.0
-
Flowable
withObservable
The difference in function is mainlyMore function of back pressure - Next, I will follow section 3 to explain the implementation principle & solution of back pressure strategy (as shown in the figure below)
Flowable
The use of the back pressure strategy function
Schematic diagram
Note:
- Since we mentioned in Section 2 that scenarios using back pressure = asynchronous subscriptions, we will focus on asynchronous subscription scenarios, where the observed and the observer are working on different threads
- However, since flow rate mismatches can also occur in synchronous subscription scenarios, I’ll talk a little bit about synchronization after the asynchronous case for comparison purposes
5.1 Control the rate at which the observer receives events
5.1.1 Asynchronous Subscription
- Introduction to the
Schematic diagram
- Specific schematic diagram
Schematic diagram
- The specific use
Create (new FlowableOnSubscribe<Integer>() {@override public void Subscribe (FlowableEmitter<Integer> Emitter) throws Exception {subscribe(Emitter<Integer> emitter).log (TAG, "sending event 1"); emitter.onNext(1); Log.d(TAG, "send event 2"); emitter.onNext(2); Log.d(TAG, "send event 3"); emitter.onNext(3); Log.d(TAG, "send event 4"); emitter.onNext(4); Log.d(TAG, "send done "); emitter.onComplete(); }}, BackpressureStrategy. ERROR). SubscribeOn (Schedulers. IO ()) / / set the observed in IO thread. ObserveOn (AndroidSchedulers. MainThread ()) / / Subscribe(new Subscriber<Integer>() {@override public void onSubscribe(Subscription s) {// Subscriber = Subscription // The Subscription argument has the effect of the Disposable argument, Disposable.dispose() disconnects the connection, and the Subscription.cancel() disconnects the connection. Subscription adds void Request (long n) s.count (3); / / function: MAX_VALUE (long.max_value); // The official default is to use long. MAX_VALUE (long.max_value); // The default is to use s.count (long.max_value). } @override public void onNext(Integer Integer) {log. d(TAG, "received event" + Integer); } @Override public void onError(Throwable t) { Log.w(TAG, "onError: ", t); } @Override public void onComplete() { Log.d(TAG, "onComplete"); }});Copy the code
- rendering
Schematic diagram
- There are two conclusions that you should be aware of
Schematic diagram
Figure below = Schematic of overflow errors reported when cache is full (128 events)
Schematic diagram
- Code demo 1: The observer does not receive events, the observer continues to send events & store in the cache; Remove as needed
Private static final String TAG = "Rxjava"; private Button btn; // This button is used to call Subscription. Request (long n) private Subscription mSubscription; Request (long n) */ BTN = (Button) findViewById(R.id.btn); btn.setOnClickListener(new View.OnClickListener() { @Override public void onClick(View view) { mSubscription.request(2); }}); /** * Step 3: Create (new FlowableOnSubscribe<Integer>() {@override public void subscribe(FlowableEmitter<Integer> Emitter) throws Exception {log. d(TAG, "sending event 1"); emitter.onNext(1); Log.d(TAG, "send event 2"); emitter.onNext(2); Log.d(TAG, "send event 3"); emitter.onNext(3); Log.d(TAG, "send event 4"); emitter.onNext(4); Log.d(TAG, "send done "); emitter.onComplete(); }}, BackpressureStrategy. ERROR). SubscribeOn (Schedulers. IO ()) / / set the observed in IO thread. ObserveOn (AndroidSchedulers. MainThread ()) / / Subscribe(new Subscriber<Integer>() {@override public void onSubscribe(Subscription s) {log.d (TAG, "onSubscribe"); mSubscription = s; // Save the Subscription object, Override public void onNext(Integer Integer) {log. d(TAG, "received event" + Integer); } @Override public void onError(Throwable t) { Log.w(TAG, "onError: ", t); } @Override public void onComplete() { Log.d(TAG, "onComplete"); }});Copy the code
Schematic diagram
- Code Demo 2: The observed continues to send events beyond the cache size without the observer receiving the event (128)
Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(FlowableEmitter<Integer> emitter) Throws Exception {for (int I = 0; i< 129; I ++) {log.d (TAG, "sent event" + I); emitter.onNext(i); } emitter.onComplete(); }}, BackpressureStrategy. ERROR). SubscribeOn (Schedulers. IO ()) / / set the observed in IO thread. ObserveOn (AndroidSchedulers. MainThread ()) / / Subscribe(new Subscriber<Integer>() {@override public void onSubscribe(Subscription s) {log.d (TAG, "onSubscribe"); } @override public void onNext(Integer Integer) {log. d(TAG, "received event" + Integer); } @Override public void onError(Throwable t) { Log.w(TAG, "onError: ", t); } @Override public void onComplete() { Log.d(TAG, "onComplete"); }});Copy the code
Schematic diagram
5.1.2 Synchronizing Subscription Information
The difference between synchronous and asynchronous subscriptions is that:
- In synchronous subscriptions, the observed and the observer work on the same thread
- There is no cache in a synchronous subscription
Schematic diagram
- After sending an event, the observed must wait for the observer to receive it before sending another event
/** * Step 1: Upstream = Flowable. Create (new FlowableOnSubscribe<Integer>() {@override public Void subscribe(FlowableEmitter<Integer> Emitter) throws Exception {// Sending 3 events log. d(TAG, "sending event 1"); emitter.onNext(1); Log.d(TAG, "sent event 2"); emitter.onNext(2); Log.d(TAG, "sent event 3"); emitter.onNext(3); emitter.onComplete(); } }, BackpressureStrategy.ERROR); /** * Step 2: Creating an observer = Subscriber */ Subscriber<Integer> Downstream = new Subscriber<Integer>() {@override public void onSubscribe(Subscription s) { Log.d(TAG, "onSubscribe"); s.request(3); } @override public void onNext(Integer Integer) {log. d(TAG, "received event" + Integer); } @Override public void onError(Throwable t) { Log.w(TAG, "onError: ", t); } @Override public void onComplete() { Log.d(TAG, "onComplete"); }}; /** * Step 3: create a subscription */ upstream. Subscribe (downstream);Copy the code
Schematic diagram
Therefore, it is not true that the speed at which the observer sends events is greater than the speed at which the observer receives them. However, there is a problem of the number of events sent by the observer > the number of events received by the observer.
- For example, the observer can only accept 3 events, but the observed sent 4 events, so there is a mismatch
/** * Step 1: Upstream = Flowable. Create (new FlowableOnSubscribe<Integer>() {@override public Void subscribe(FlowableEmitter<Integer> emitter) throws Exception {// The number of events sent by the observer = 4 log. d(TAG, "sent event 1"); emitter.onNext(1); Log.d(TAG, "sent event 2"); emitter.onNext(2); Log.d(TAG, "sent event 3"); emitter.onNext(3); Log.d(TAG, "sent event 4"); emitter.onNext(4); emitter.onComplete(); } }, BackpressureStrategy.ERROR); /** * Step 2: Creating an observer = Subscriber */ Subscriber<Integer> Downstream = new Subscriber<Integer>() {@override public void onSubscribe(Subscription s) { Log.d(TAG, "onSubscribe"); s.request(3); } @override public void onNext(Integer Integer) {log. d(TAG, "received event" + Integer); } @Override public void onError(Throwable t) { Log.w(TAG, "onError: ", t); } @Override public void onComplete() { Log.d(TAG, "onComplete"); }}; /** * Step 3: create a subscription */ upstream. Subscribe (downstream);Copy the code
Schematic diagram
Therefore, for a synchronous subscription without the concept of cache, simply controlling the number of events received by the observer (reactive pull) is actually “unrequited love”. Although the observer controls the number of events received, there is still a problem if the observed needs to send four events.
This will be addressed in 5.2 controlling the rate at which the observed sends events.
- There is a special case to note
Schematic diagram
- Code demo
/** * Step 1: Upstream = Flowable. Create (new FlowableOnSubscribe<Integer>() {@override public Void subscribe(FlowableEmitter<Integer> emitter) throws Exception {log. d(TAG, "sending event 1"); emitter.onNext(1); Log.d(TAG, "sent event 2"); emitter.onNext(2); Log.d(TAG, "sent event 3"); emitter.onNext(3); emitter.onComplete(); } }, BackpressureStrategy.ERROR); /** * Step 2: Creating an observer = Subscriber */ Subscriber<Integer> Downstream = new Subscriber<Integer>() {@override public void onSubscribe(Subscription s) { Log.d(TAG, "onSubscribe"); Request (long n) // s.equest (long.max_value); } @Override public void onNext(Integer integer) { Log.d(TAG, "onNext: " + integer); } @Override public void onError(Throwable t) { Log.w(TAG, "onError: ", t); } @Override public void onComplete() { Log.d(TAG, "onComplete"); }}; /** * Step 3: create a subscription */ upstream. Subscribe (downstream);Copy the code
After the first event that is sent by the observer, an abnormal MissingBackpressureException & observer didn’t receive any events
Schematic diagram
5.2 Control the speed at which the observed sends the event
- Introduction to the
Schematic diagram
-
FlowableEmitter
Of the classrequested()
introduce
Public Emitter<T> extends Emitter<T> public Emitter<T> onNext(),onComplete() & onError long requested(); // Request (a) returns the value of a in request (a) in the current thread // post only key code}Copy the code
-
The return value of requested () in each thread = the a value of Request (a) in that thread
-
Schematics corresponding to synchronous & asynchronous subscription situations
Schematic diagram
To help you understand the use of Requested () in this policy, this section explains synchronous subscriptions first and then asynchronous subscriptions
5.2.1 Synchronizing Subscription Information
- Principle that
Schematic diagram
In synchronous subscribe to the case, the observed by FlowableEmitter. Requested () to obtain the observer’s own ability to receive events, thus according to the information control event sending speed, so as to achieve the observer inversion of control is the effect of the observer
- Using the following example = the observed sends only 10 events based on the observer’s ability to receive events (10 events)
Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(FlowableEmitter<Integer> emitter) Throws Exception {// Call Emitters. Requested () to get the number of events the current observer needs to receive. Long n = Emitters. Log.d(TAG, "observable event" + n); Emitter. Requested (); for (int I = 0; i < n; I ++) {log.d (TAG, "sent event" + I); emitter.onNext(i); }}}, BackpressureStrategy.ERROR) .subscribe(new Subscriber<Integer>() { @Override public void onSubscribe(Subscription s) { Log.d(TAG, "onSubscribe"); // Set the observer to accept 10 events at a time. } @override public void onNext(Integer Integer) {log. d(TAG, "received event" + Integer); } @Override public void onError(Throwable t) { Log.w(TAG, "onError: ", t); } @Override public void onComplete() { Log.d(TAG, "onComplete"); }});Copy the code
Schematic diagram
- Pay special attention to
Used in synchronous subscription situationsFlowableEmitter.requested()
The following features need to be paid attention to:
Schematic diagram
Case 1: Superposition
- That is, observers can continuously request to receive events, which are superimposed and sent together
Subscription. Request (A1); Subscription. Request (A2); FlowableEmitter. Requested return value = a1 + a2 ()Copy the code
- Code demo
Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(FlowableEmitter<Integer> emitter) Throws Exception {// Call Emitters. Requested () to get the number of events the current observer needs to receive log.d (TAG, "Emitters can receive events" + emitters. Requested ()); }}, BackpressureStrategy.ERROR) .subscribe(new Subscriber<Integer>() { @Override public void onSubscribe(Subscription s) { Log.d(TAG, "onSubscribe"); s.request(10); // Set the observer to accept 10 events at a time. } @override public void onNext(Integer Integer) {log. d(TAG, "received event" + Integer); } @Override public void onError(Throwable t) { Log.w(TAG, "onError: ", t); } @Override public void onComplete() { Log.d(TAG, "onComplete"); }});Copy the code
Schematic diagram
Case 2: Real-time updates
- That is, after each event is sent, Emitters. Requested () will update events that are acceptable to the observer in real time
- That is, the observer will receive 10 events at first, and after sending one, it will be updated to 9 in real time
- Only calculate
Next
The event,complete & error
Events don’t count.
Subscription. Request (10); / / FlowableEmitter. Requested () returns a value = 10. FlowableEmitter onNext (1); / / sent one event / / FlowableEmitter requested () returns a value = 9Copy the code
- Code demo
Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(FlowableEmitter<Integer> emitter) throws Exception { // 1. Log. D (TAG, "Emitters can receive events =" + emitters. Requested ()); Emitter. Requested () will be updated to 9 log. d(TAG, "event 1 was sent ") after each event sent. emitter.onNext(1); Log.d(TAG, "emitters =" + emitter. Requested ()); Log.d(TAG, "sent event 2"); emitter.onNext(2); Log.d(TAG, "emitter =" + emitters. Requested ()); Log.d(TAG, "sent event 3"); emitter.onNext(3); Log.d(TAG, "emitters =" + emitters ()); log.d (TAG, "emitters =" + emitters. emitter.onComplete(); }}, BackpressureStrategy.ERROR) .subscribe(new Subscriber<Integer>() { @Override public void onSubscribe(Subscription s) { Log.d(TAG, "onSubscribe"); s.request(10); } @override public void onNext(Integer Integer) {log. d(TAG, "received event" + Integer); } @Override public void onError(Throwable t) { Log.w(TAG, "onError: ", t); } @Override public void onComplete() { Log.d(TAG, "onComplete"); }});Copy the code
Schematic diagram
Case 3: Abnormal
- when
FlowableEmitter.requested()
When it drops to 0, the observer is no longer able to receive events - If the observed continues to send the event, it will be thrown
MissingBackpressureException
abnormal
If the number of events received by the observer = 1, an exception will be thrown when the observed sends the second event
Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(FlowableEmitter<Integer> emitter) throws Exception { // 1. Log. D (TAG, "Emitters can receive events =" + emitters. Requested ()); Emitter. Requested () will be updated to 9 log. d(TAG, "event 1 was sent ") after each event sent. emitter.onNext(1); Log.d(TAG, "emitters =" + emitter. Requested ()); Log.d(TAG, "sent event 2"); emitter.onNext(2); Log.d(TAG, "emitter =" + emitters. Requested ()); emitter.onComplete(); }}, BackpressureStrategy.ERROR) .subscribe(new Subscriber<Integer>() { @Override public void onSubscribe(Subscription s) { Log.d(TAG, "onSubscribe"); s.request(1); } @override public void onNext(Integer Integer) {log. d(TAG, "received event" + Integer); } @Override public void onError(Throwable t) { Log.w(TAG, "onError: ", t); } @Override public void onComplete() { Log.d(TAG, "onComplete"); }});Copy the code
Schematic diagram
additional
- If the observer does not set the number of events that can be received, there is no call
The Subscription request ()
- Then the number of events received by the observed default observer = 0, i.e
FlowableEmitter.requested()
Return value = 0
5.2.2 Asynchronous Subscription
- Principle that
Schematic diagram
Can be seen from the above, as the two are in different threads, so cannot be observed by FlowableEmitter. Requested () know the viewer receive events ability, is not according to the viewer receives events observed the ability to control the speed of sending events. See the following example for details
Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(FlowableEmitter<Integer> emitter) Throws Exception {// Call Emitters. Requested () to get the number of events the current observer needs to receive log. d(TAG, "Emitters can receive events =" + emitters. }}, BackpressureStrategy. ERROR). SubscribeOn (Schedulers. IO ()) / / set the observed in IO thread. ObserveOn (AndroidSchedulers. MainThread ()) / / Subscribe(new Subscriber<Integer>() {@override public void onSubscribe(Subscription s) {log.d (TAG, "onSubscribe"); s.request(150); // This setting only affects requested in the observer thread, But will not affect the observed in FlowableEmitter. Requested () returns a value / / because FlowableEmitter requested () returns a value depends on RxJava internal call request (n), } @override public void onNext(Integer Integer) {log.d (TAG) {Override public void onNext(Integer Integer) {Log. "Event received" + integer); } @Override public void onError(Throwable t) { Log.w(TAG, "onError: ", t); } @Override public void onComplete() { Log.d(TAG, "onComplete"); }});Copy the code
Schematic diagram
In an asynchronous subscription relationship, the principle of reverse control is: throughRxJava
Internal fixed calls are made to the observer threadrequest(n)
Thus reverse control of the observed sending event speed
So when should I call request(n) &n in the observed thread? Keep reading.
- The specific use
RxJava internal call request(n) (n = 128, 96, 0)
Schematic diagram
Request (128), Request (96), and Request (0) are called
- Code demo
I’ll use an example to illustrate the logic of this principle
/ / be observer: need to send in total 500 events, but the premise that really start sending events = FlowableEmitter. Requested () return value indicates a 0 / / observer: Flowable. Create (new FlowableOnSubscribe<Integer>() {@override public void Subscribe (FlowableEmitter<Integer> Emitter) throws Exception {log.d (TAG, "emitter can receive events =" + emitter. Requested ()); boolean flag; For (int I = 0; i < 500; i++) { flag = false; While (emitter. Request () == 0) {if (! Flag) {log. d(TAG, "no more sending "); flag = true; If () ≠ 0, emitters (emitter, emitter, emitter, emitter, emitter, emitter, emitter, emitter, emitter, emitter, emitter, emitter, emitter, emitter, emitter, emitter, emitter, emitter, emitter, emitter, emitter, emitter, emitter, emitter, emitter, emitter, emitter, emitter, emitter, emitter); emitter.onNext(i); }}}, BackpressureStrategy. ERROR). SubscribeOn (Schedulers. IO ()) / / set the observed in IO thread. ObserveOn (AndroidSchedulers. MainThread ()) / / Subscribe(new Subscriber<Integer>() {@override public void onSubscribe(Subscription s) {log.d (TAG, "onSubscribe"); mSubscription = s; // Initial state = do not receive events; } @override public void onNext(Integer Integer) {log. d(TAG, "received event" + Integer); } @Override public void onError(Throwable t) { Log.w(TAG, "onError: ", t); } @Override public void onComplete() { Log.d(TAG, "onComplete"); }}); BTN = (Button) findViewById(R.id.btn); btn.setOnClickListener(new View.OnClickListener() { @Override public void onClick(View view) { mSubscription.request(48); // Click the button to receive 48 events}});Copy the code
The whole process & test results are shown below
Schematic diagram
5.3 BackpressureStrategy: BackpressureStrategy
5.3.1 Back pressure Mode
In the use of Flowable, you are asked to pass in the back pressure mode parameter
Schematic diagram
- Object oriented: For cache
- Effect: When the cache size is full and the observed continues to send the next event, how to handle the policy
Cache size full, overflow = send event speed > receive event speed result = send & receive event mismatch result
5.3.2 Back pressure Mode Type
Schematic diagram
I’ll explain each of these patterns one by one.
Pattern 1: BackpressureStrategy. ERROR
- Problem: Send event speed > receive event speed, that is, the flow rate does not match
Specific performance: occurs when the cache size is full (default cache size = 128) and the observed continues to send the next event
- Solution: Throw an exception directly
MissingBackpressureException
Create (new FlowableOnSubscribe<Integer>() {@override public void Subscribe (FlowableEmitter<Integer> Emitter) throws Exception {// For (int I = 0; i< 129; I ++) {log.d (TAG, "sent event" + I); emitter.onNext(i); } emitter.onComplete(); }}, BackpressureStrategy. ERROR). / / set the back pressure mode = BackpressureStrategy ERROR. SubscribeOn (Schedulers. IO ()) / / set the observed in IO thread ObserveOn (AndroidSchedulers. MainThread ()) / / set the observer in the main thread. The subscribe (new Subscriber < Integer > () {@ Override public void onSubscribe(Subscription s) { Log.d(TAG, "onSubscribe"); } @override public void onNext(Integer Integer) {log. d(TAG, "received event" + Integer); } @Override public void onError(Throwable t) { Log.w(TAG, "onError: ", t); } @Override public void onComplete() { Log.d(TAG, "onComplete"); }});Copy the code
Schematic diagram
Pattern 2: BackpressureStrategy. MISSING
- Problem: Send event speed > receive event speed, that is, the flow rate does not match
Specific performance is: when the cache size is full (default cache size = 128), the observed still continue to send the next event
- Friendly tip: The cache is full
Create (new FlowableOnSubscribe<Integer>() {@override public void Subscribe (FlowableEmitter<Integer> Emitter) throws Exception {// For (int I = 0; i< 129; I ++) {log.d (TAG, "sent event" + I); emitter.onNext(i); } emitter.onComplete(); }}, BackpressureStrategy. MISSING). / / set the back pressure mode = BackpressureStrategy MISSING. SubscribeOn (Schedulers. IO ()) / / set the observed in IO thread ObserveOn (AndroidSchedulers. MainThread ()) / / set the observer in the main thread. The subscribe (new Subscriber < Integer > () {@ Override public void onSubscribe(Subscription s) { Log.d(TAG, "onSubscribe"); } @override public void onNext(Integer Integer) {log. d(TAG, "received event" + Integer); } @Override public void onError(Throwable t) { Log.w(TAG, "onError: ", t); } @Override public void onComplete() { Log.d(TAG, "onComplete"); }});Copy the code
Schematic diagram
Pattern 3: BackpressureStrategy. BUFFER
- Problem: Send event speed > receive event speed, that is, the flow rate does not match
Specific performance is: when the cache size is full (default cache size = 128), the observed still continue to send the next event
- Solution: Set the cache size to infinity
- That is, the observed can send event observers indefinitely, but it is actually stored in the cache
- However, pay attention to the memory condition to prevent OOM
Create (new FlowableOnSubscribe<Integer>() {@override public void Subscribe (FlowableEmitter<Integer> Emitter) throws Exception {// For (int I = 1; i< 130; I ++) {log.d (TAG, "sent event" + I); emitter.onNext(i); } emitter.onComplete(); }}, BackpressureStrategy. BUFFER). / / set the back pressure mode = BackpressureStrategy BUFFER. SubscribeOn (Schedulers. IO ()) / / set the observed in IO thread ObserveOn (AndroidSchedulers. MainThread ()) / / set the observer in the main thread. The subscribe (new Subscriber < Integer > () {@ Override public void onSubscribe(Subscription s) { Log.d(TAG, "onSubscribe"); } @override public void onNext(Integer Integer) {log. d(TAG, "received event" + Integer); } @Override public void onError(Throwable t) { Log.w(TAG, "onError: ", t); } @Override public void onComplete() { Log.d(TAG, "onComplete"); }});Copy the code
The number of events that exceed the original cache size (128) can now be received
Schematic diagram
Pattern 4: BackpressureStrategy. DROP
- Problem: Send event speed > receive event speed, that is, the flow rate does not match
Specific performance is: when the cache size is full (default cache size = 128), the observed still continue to send the next event
- Handling method: Discard events that exceed the cache size (128)
If 150 events are sent, only events 1-128 are saved, and events 129-150 are discarded
Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(FlowableEmitter<Integer> emitter) Throws Exception {// Send 150 events for (int I = 0; i< 150; I ++) {log.d (TAG, "sent event" + I); emitter.onNext(i); } emitter.onComplete(); }}, BackpressureStrategy. DROP) / / set the back pressure mode = BackpressureStrategy. DROP the subscribeOn (Schedulers. IO ()) / / set the observed in IO thread ObserveOn (AndroidSchedulers. MainThread ()) / / set the observer in the main thread. The subscribe (new Subscriber < Integer > () {@ Override public void onSubscribe(Subscription s) { Log.d(TAG, "onSubscribe"); mSubscription = s; } @override public void onNext(Integer Integer) {log. d(TAG, "received event" + Integer); } @Override public void onError(Throwable t) { Log.w(TAG, "onError: ", t); } @Override public void onComplete() { Log.d(TAG, "onComplete"); }}); btn = (Button) findViewById(R.id.btn); btn.setOnClickListener(new View.OnClickListener() { @Override public void onClick(View view) { mSubscription.request(128); // receive 128 events at a time}});Copy the code
The observed sent 150 events at once, and 128 when the button was clicked to receive; The event cannot be accepted when you click accept again, indicating that events that exceed the size of the cache are discarded.
Schematic diagram
Pattern 5: BackpressureStrategy. LATEST
- Problem: Send event speed > receive event speed, that is, the flow rate does not match
Specific performance is: when the cache size is full (default cache size = 128), the observed still continue to send the next event
- Handling method: Only the latest (last) events are saved, and events exceeding the buffer size (128) are discarded
That is, if 150 events are sent, 129 events (1-128th + 150th) will be stored in the cache.
Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(FlowableEmitter<Integer> emitter) throws Exception { for (int i = 0; i< 150; I ++) {log.d (TAG, "sent event" + I); emitter.onNext(i); } emitter.onComplete(); }}, BackpressureStrategy. LATEST) / / / / set back pressure mode = BackpressureStrategy. LATEST. SubscribeOn (Schedulers. IO ()) / / set the observed in IO thread ObserveOn (AndroidSchedulers. MainThread ()) / / set the observer in the main thread. The subscribe (new Subscriber < Integer > () {@ Override public void onSubscribe(Subscription s) { Log.d(TAG, "onSubscribe"); mSubscription = s; } @override public void onNext(Integer Integer) {log. d(TAG, "received event" + Integer); } @Override public void onError(Throwable t) { Log.w(TAG, "onError: ", t); } @Override public void onComplete() { Log.d(TAG, "onComplete"); }}); btn = (Button) findViewById(R.id.btn); btn.setOnClickListener(new View.OnClickListener() { @Override public void onClick(View view) { mSubscription.request(128); // receive 128 events at a time}});Copy the code
- The observed sent 150 events at once, and 128 when the button was clicked to receive;
- When I click accept again, I receive 1 event (150th event), indicating that only the last event (150th event) is retained for events that exceed the cache size
Schematic diagram
5.3.3 Special attention
When using the back pressure strategy mode, there are 1 situations that need to be noted:
A. Background FLowable can be created by itself (as in the example above) or automatically by other means, such as the interval operator
An introduction to the interval operator
- Function: every 1 period of time to produce a number (Long type), starting from 0, 1 increment, until infinity
- The default runs on a new thread
- Different from the timer operator: The timer operator terminates sending
B. conflict
-
For manually creating FLowable yourself, you can select the back pressure policy (described above) by passing in the back pressure mode parameter
-
However, it is not possible to manually pass in the back pressure mode parameters to automatically create FLowable. How to select the back pressure mode in the case of flow mismatch?
// Interval automatically creates the observed Flowable // every 1ms increments the current number (starting from 0) by 1, Flowable. Interval (1, Timeunit.milliseconds).observeon (schedulers.newthread ()) // The observer also works on a new thread. Subscribe (new Subscriber<Long>() { @Override public void onSubscribe(Subscription s) { Log.d(TAG, "onSubscribe"); mSubscription = s; s.request(Long.MAX_VALUE); Override public void onNext(Long aLong) {log. d(TAG, "onNext: "+ aLong); try { Thread.sleep(1000); / / every time delay of 1 second to receive events / / for sending events = delay 1 ms, receive events = delay 1 s, the speed of sending & receiving speed mismatch problem / / cache area will soon be filled with 128 events, thus thrown MissingBackpressureException abnormalities, Catch (InterruptedException e) {e.printStackTrace(); } } @Override public void onError(Throwable t) { Log.w(TAG, "onError: ", t); } @Override public void onComplete() { Log.d(TAG, "onComplete"); }});Copy the code
Schematic diagram
C. Solution RxJava 2.0 internally provides a way to encapsulate the back pressure policy pattern
onBackpressureBuffer()
onBackpressureDrop()
onBackpressureLatest()
The default using BackpressureStrategy. The ERROR model
Specific use is as follows:
Flowable.interval(1, timeunit.milliseconds).onBackpressureBuffer() ObserveOn (schedulers.newthread ()). Subscribe (new Subscriber<Long>() {@override public void onSubscribe(Subscription s) { Log.d(TAG, "onSubscribe"); mSubscription = s; s.request(Long.MAX_VALUE); } @Override public void onNext(Long aLong) { Log.d(TAG, "onNext: " + aLong); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void onError(Throwable t) { Log.w(TAG, "onError: ", t); } @Override public void onComplete() { Log.d(TAG, "onComplete"); }});Copy the code
Thus, the speed mismatch between sending events and receiving events is solved.
Diagram of encapsulation method. GIF
The function of the other methods is similar to that of the back pressure mode parameters, which are not described here.
Summary of back pressure strategy mode
Schematic diagram
- At this point, the
RxJava 2.0
The back pressure mode is finally explained - All code demos are stored at Carson_Ho’s Github address
6. Summary
-
This article mainly on Rxjava back pressure mode knowledge to explain
-
In the coming days, I will continue to publish a series of articles on Rxjava 2.0 in Android, including principles, operators, application scenarios, back pressure, etc. If you are interested, please continue to follow Carson_Ho’s Android development notes!!
Schematic diagram
Thumb up, please! Because your encouragement is the biggest power that I write!
Related articles reading
- Android Rxjava: create operator Android Rxjava: create operator Android Rxjava: create operator Android Rxjava: create operator Android Rxjava: create operator Android Rxjava: create operator Android Rxjava: create operator Android Rxjava: create operator Android Rxjava: Android RxJava: A comprehensive tutorial on the functional operators
- Android RxJava application description: (Unconditional) Network request polling Android RxJava application description: (conditional) Network request polling Android RxJava application description: Network request nested callback Android RxJava: merge data source Android RxJava: merge data from disk/memory cache: merge data from disk/memory cache Android RxJava: Network request error reconnection (Retrofit)
Welcome to attentionCarson_HoJane books!
Share the dry things about Android development from time to time, the pursuit of short, flat, fast, but there is no lack of depth.