topic
RxJava interview questions: RxJava interview questions: RxJava interview questions: . In the last article, we analyzed that subscribeOn called for many times is only effective in the first call, because the Observable returned by the initially called subscribeOn will overwrite the Observable returned by the later executed subscribeOn. Therefore, we perceive that only the first subscribeOn can take effect. In fact, every Observable in the middle still has a specified thread when it is generated, but the most upstream Observable only receives the thread specified by the first subscribeOn. So we can monitor that the middle Observable does have its own thread through doOnSubscribe. That’s the topic of this section.
In the previous section, we looked at the observer thread specified by observeOn that follows observeOn. If we call observeOn multiple times, it will be the last time observeOn effectively specifies the observer thread. We can use doOnNext to listen for every observeOn thread switch, which is the topic of this section.
If the observer thread is not specified and only the Observable thread is specified, the observer thread will go with the Observable thread. That is, the subscribeOn is only set and observeOn is not set. That’s the topic of this section.
In the last video we briefly talked about back pressure, so what is back pressure and how Flowable can control it are also topics for this section.
The topic of this section
- How does doOnSubscribe listen to the observable thread in the middle?
- How does doOnNext listen for every observeOn thread switch, and who controls the map apply thread?
- What does an Observer thread look like if you do not specify the subscribeOn thread and do not set the observeOn thread?
- What is the back pressure and how can Flowable control it?
DoOnSubscribe listening in
In the previous section, we introduced that subscribeOn controls which thread the upstream Observable executes in. How to control the upstream Observable can be seen in my previous articleRxJava surface after a, take it, thank you!, so when subscribeOn is executed for many times, the Observable receives the thread specified by the first subscribeOn, because each setting will be overwritten by the thread set by the next layer. The coverage here is for the most upstream Observable. Observable generated in the middle actually has thread switching. We can listen to the switchover of subscribeOn thread every time by doOnSubscribe. Let’s take an example:As we mentioned in the previous article, subscriptions go from the most downstream observer to the layer upon layer of observables, so our most downstream Observable starts subscribing, i.e. (1)subscribeOn
The generatedObservableSubscribeOn
The observer starts subscribing, and it adds a subscription to its upstream Observable (②) in the subscribing methoddoOnSubscribe
The generatedObservableDoOnLifecycle
The observer subscribes, then subscribes to ③ in its subscription, ③ adds a subscription to ④, and finally subscribes to the most upstream ObservableObservableOnSubscribe
The SUBSCRIBE method is called. This is the order of subscriptions from the bottom up, illustrated in a diagram below:
What about when doOnSubscribe’s internal Consumer accept method is called?
Look directly at the graph above, which is generated in the last Observable, doOnSubscribeObservableDoOnLifecycle
The interior decoration Observer (DisposableLambdaObserver
) when listening for a subscription. In this example, the upstream Observables of observables (no. ② and no. ④) generated by doOnSubscribe are generated by subscribeOn, and subscribeOn is eventually generatedObservableSubscribeOn
Observable, which adds a listener to the downstream observer directly in its subscription:
Therefore, it can be seen from the above that the print of doOnSubscribe at (2) is when the subscribeOn of (3) upstream subscribes, so it prints the result first, and then (4) prints the result again. Finally, it is the print of the subscription of the most upstream Observable. So what happens to the thread that’s received by Accept in every doOnSubscribe, and I’ll just say the conclusion, is that it’s consistent with the thread that’s assigned to it by the subscribeOn underneath it. So ② prints the thread specified at ①, ④ prints the thread specified at ③, and then we analyze doOnSubscribe.
In the previous section we saw that subscribeOn is the thread that assigns its upstream Observable subscriptions, and the doOnSubscribe operator ultimately generates an ObservableDoOnLifecycle Observable. So it can be said that the subscription to ObservableDoOnLifecycle takes place in a thread that is determined by the thread specified by the subscribeOn that follows it.
And in theObservableDoOnLifecycle
It directly subscribes to the upstream Observable, which is the second one in the above exampleobservable
.ObservableDoOnLifecycle
thesubscribeActual
The method is as follows:As described in the previous article, in the subscription method of each Observable, the decorator observer is first created and the downstream observer is transferred to the decorator observer. Then the callback of the subscription is added to the downstream observer, and the subscription is added to the upstream Observable. And this right hereObservableDoOnLifecycle
The subscription method first creates a Decorator Observer to DisposableLambdaObserver, and then adds a subscription to the upstream Observable. Add a subscription to the downstream Obserer, which is left to DisposableLambdaObserveronSubscribe
In the now.
Because above we generate the most downstream by doOnSubscribeobservable(ObservableDoOnLifecycle)
So its upstream Observable is also an IO thread. We haven’t finished analyzing the thread that doOnSubscribe passes to the Consumer accept method. This will require us to take a look at the decorator DisposableLambdaObserver created in the ObservableDoOnLifecycle subscription analyzed above:
The accept callback is performed on the incoming Consumer, and the downstream observer receives the subscribe callback.
Who causes the Observer to DisposableLambdaObserver to be launched, it must be the downstream Observer to be launched when the upstream Observable starts to subscribe. The upstream Observable subscribing thread is determined by the subscribeOn next to doOnSubscribe. The consumer in doOnSubscribe listens to the thread that occurs during the Observable subscription process upstream specified by subscribeOn.
Draw a picture below to help your brain:
DoOnSubscribe section
The above analysis is not easy to do without the source code. The whole point is that subscribeOn assigns its upstream Observable thread, which happens to be an Observable generated by doOnSubscribe. This Observable is ObservableDoOnLifecycle, and in its subscription directly subscribes to the observables upstream, So the upstream Observable thread of doOnSubscribe is also assigned by the subscribeOn thread that doOnSubscribe subscribeOn generates. It is ObservableSubscribeOn, in its subscription is directly to monitor the downstream of the observer to subscribe to the callback, This is the decorator DisposableLambdaObserver generated in the ObservableDoOnLifecycle subscription generated by doOnSubscribe, whose subscription listener calls the Accept method of the Consumer passed in by doOnSubscribe.
So that’s what happens when you call multiple subscribeOn calls and you can doOnSubscribe to do thread switch listening.
DoOnNext listens for the observeOn thread switch. Who controls the apply thread of the map?
To answer your question, let’s look at doOnNext:
DoOnNext actually makes a lot of sense because it emits data from an upstream Observable to a downstream Observable, and observeOn is the thread that assigns the downstream observer to emit data, which I talked about in the last post, What doOnNext actually generates is an ObservableDoOnEach Observable. In the subscription method, it generates an ornamental observer, the DoOnEachObserver, So observeOn actually controls the thread on which DoOnEachObserver sends data, and when it sends onNext, it calls the Consumer accept method passed in by onNext:
Observable also has onComplete and onError listeners, which are eventually generatedObservableDoOnEach
The observables
DoOnNext section
When an upstream Observable sends data, it executes the Accept method of doOnNext’s Consumer. Therefore, when the thread is specified by observeOn several times, You can get the switch thread via doOnNext.
So this is where multiple calls to observeOn can listen for thread switches through onNext.
In terms of who controls the thread of a Map’s Apply method, look directly at the Map Observable, which is an ObservableObservableMap
:As you can see, the map operator generatesObservableMap
In its subscribe method, it generates a decorated MapObserver and then adds a subscription to the upstream Observable.When a MapObserver receives onNext data sent by an upstream Observable, it calls the apply method of the function sent by the map. Therefore, the apply method is related to the thread from which the upstream Observable sends data, let’s look at the following example:We know that subscribeOn is the subscribing thread that specifies the upstream Observable. In the previous article, we talked about multiple subscribeOn threads that are only effective for the first time, which is for the most upstream Observable. So the upstream Observable emitting data thread keeps up with the IO thread specified after it, so it prints something like this:
The thread will not be changed when each subscribeOn sends data, so the map thread will keep the thread of the most upstream Observable, that is, the IO thread, so the printing will be as follows:
Since subscribeOn does not change the thread that sends data, and the map thread is not changed for multiple subscribeOn, it is only consistent with the thread that sends data from the most upstream Observable. How about inserting observeOn in the middle? Since observeOn changes the thread at which data is sent downstream, that is, the thread at which data is received in the downstream Observer, namely the onNext, onComplete, onError methods, So the thread specified by observeOn is propagated all the way to the onNext method of the downstream MapObserver, so the final apply method of the map function is the main thread, the print result is as follows:
In other cases you can try, you can specify the observeOn thread several times and see what the final thread of the map looks like
The map section
Map uses the function sent in as a monitor for the data transmitted by the upstream Observable. When an upstream Observable sends data, it executes the apply method of function to transform the data. So the apply method of the map function is related to the thread from which the upstream Observable emits data.
What does an Observer thread look like if you do not specify the subscribeOn thread and do not set the observeOn thread?
I feel that I have understood the whole subscription process. In fact, it is not difficult to understand this problem. Since subscribeOn is the thread that assigns the upstream Observable, the thread that sends data when the upstream Observable sends data will also be related to the thread specified by the subscribeOn next to it. ObserveOn specifies the thread of the downstream Observer. Observer threads are consistent with the thread of the upstream Observable that emits data.
What is the back pressure and how can Flowable control it?
It refers to the phenomenon that the upstream Observable sends data too fast for the downstream Observer to receive. In order to maintain water balance, the dam releases water to the downstream reservoir and receives water from the upstream. If the upstream water flow is large, the water level in the dam surges, while the dam has limited capacity to release water to the downstream, so the water in the dam overflows the dam.
RxJava1.0 back pressure
Note: When it comes to RxJava back pressure, it starts with RxJava1.0. The source code analyzed here is RxJava1.0 version analyzed in 1.3.8
In version 1.0 of RxJava, an Observable supports back pressure, but presents it to the user as an exception. Here’s an example of an Observable sending data upstream:
Observable.unsafeCreate(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { int i = 0; while (true) { subscriber.onNext(i); i++; } } }).subscribe(new Observer<Integer>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { Log.d(TAG, "onError:" + e.getMessage()); e.printStackTrace(); } @Override public void onNext(Integer integer) { Log.d(TAG, "onNext:" + integer); }});Copy the code
In our log, there is no problem that the downstream processing data can not handle, and there is no abnormal information ah, this is not good, there is no back pressure, look, don’t worry, back pressure is a problem that occurs in multithreading, because in a single thread, send data and receive data are in the same thread. So every time you send data, you have to wait until the downstream process is complete before sending data.
Ok, with that said, let’s add multithreading, let’s try again:
Observable.unsafeCreate(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { for (int i = 0; ; i++) { subscriber.onNext(i); } } }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Integer>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { Log.d(TAG, "onError:" + e.getMessage()); e.printStackTrace(); } @Override public void onNext(Integer integer) { Log.d(TAG, "onNext:" + integer); }});Copy the code
(I/o thread) (I/o thread) (I/o thread) (I/o thread)Quote isMissingBackpressureException
Exception information, in fact in RxJava1.0 this is the back pressure support strategy, directly through the exception information back to the user. In RxJava1.0, the maximum transmission data supported is 16, that is to say, when the transmission is greater than or equal to 17, there will be an exception, through the code verification below:
The data can be received normally. If we adjust the data to 17, will there be an exception?
RxJava1.0 backpressure is a mechanism for controlling the amount of data that can be sent upstream. We can find the size defined here:RxJava back pressure limits the number of data to be sent on the Android platform. If more than 16 data will be thrown directlyMissingBackpressureException
Abnormal,The underlying data sent by the upstream Observable is put into the queue, and 16 defines the capacity of the queue. Every time data is put into the queue, the next index to be put into the queue is first obtained. If the data in the index position is not empty, the queue is considered full, and onError message is directly returned when the data is full.
Such as we are sending a data, 17 when obtaining index is index is obtained by with docking capacity 16 facies, lust after get an index of less than 16, found after within the index and data, then send the 17th fail when the data in the queue, so direct selling onError information. The core source code is here: The process is pretty clear,OperatorObserveOn
isobserveOn
Methods toOnSubscribeLift
theOperator
In object,OperatorObserveOn
When onNext receives the data sent from upstream, it will judge whether the offer is successful. If not, it will directly throw onError. Then when the offer fails, it depends on whether the current sent data exceeds the capacity of the queue. If exceeded, the offer fails.
So this is the back pressure policy in RxJava1.0, which is achieved by setting the size of the receive queue for data sent upstream.
RxJava2.0 back pressure
Note: RxJava runs version 2.2.20
In RxJava2.0 no longer back pressure in the observables, but replaced by Flowable, that is to say the observables stopped by abnormal form tell users, also is not thrown MissingBackpressureException anomalies, RxJava2.0 sends data correctly:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; ; i++) {
emitter.onNext(i);
}
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer integer) {
Log.d(TAG, "onNext:" + integer);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
Copy the code
Although not throw MissingBackpressureException anomalies, but the memory footprint is very bad, so for RxJava2.0 problem, we have the solution, everyone will say RxJava2.0 already support Flowable, isn’t it, use it directly, What should we do if we deal with it by ourselves? First of all, we analyze the causes of back pressure:
- Events sent by the upstream are too fast for the downstream to process
- The upstream sends too many events for the downstream to process
For the first type, we can make the upstream send slow. How slow? Let the IO thread wait for a while each time it sends:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; ; i++) {
emitter.onNext(i);
Thread.sleep(1000);
}
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer integer) {
Log.d(TAG, "onNext:" + integer);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
Copy the code
Image from RxJava back pressure
For the second, we can make the downstream observer receive less data:
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { for (int i = 0; ; I ++) {// Emitters. OnNext (I); } } }).filter(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { return integer % 100 == 0; } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "" + integer); }});Copy the code
Image from RxJava back pressure
RxJava uses Flowable to support back pressure policies. Flowable. CreateFlowableCreate supports two parameters, the first of which is the upstream emission dataFlowableOnSubscribe
, the second parameter is several strategies of back pressure:
Public enum BackpressureStrategy {MISSING, / / if the rate of flow can't keep up, may throw MissingBackpressureException or an IllegalStateException. The ERROR, / / will throw MissingBackpressureException when the downstream couldn't keep up with speed. BUFFER,// the upstream stream keeps onNext requests until the downstream process is done, like an Observable, the BUFFER pool is infinite, and finally crashes DROP, which drops onNext values when the downstream can't keep up. LATEST// Retains the LATEST onNext value until it is consumed by the downstream. }Copy the code
With this in mind, let’s look at what Flowable looks like in a single thread:
In the error of the strategy of cases, and switch threads, not directly submitted to the IO. Reactivex. Exceptions. MissingBackpressureException: create: Could not emit value due to lack of requests for requests to the downstream observer could not emit value due to lack of requests for requests to the downstream observer Therefore, the upstream does not know the processing capability of the downstream and directly throws an error.
How to set the downstream processing capacity:
Add subscription (long.max_value) to the direct downstream receive subscription method. Add long.max_value (long.max_value).
What if the number of data sent upstream is greater than the number set downstream:
It can be seen that when sending the fourth data, the exception is thrown directly, because the downstream is set to handle three data. Each time after sending the fourth data, the downstream will “weaken” the ability to handle the data. When sending the fourth data, the downstream can no longer handle the exception.
The above are the results of the error policy, single thread, so what would the results look like in multithreading? There are several strategies to consider:
MISSING
In multithreading, the downstream processing power must be set because observeOn needs to know how much data the downstream can process when it sends data to the downstream. We demonstrated above is upstream to send 128 data, the results did not throw throw as MISSING strategy say MissingBackpressureException or an IllegalStateException exception information, This is because Flowable defaults to 128 data as the most data sent upstream, and we can find the number defined here:
RxJava1.0 has the same capacity as RxJava1.0, but the capacity of RxJava1.0 is 16, so when sending the 129th data, the queue will be full. Once the queue is full, the various policies defined in RxJava will occur. Let’s change the sent data to 129 to see what happens when MISSING:
See, directly behind the IO. Reactivex. Exceptions. MissingBackpressureException: the Queue is full? ! Exception information. MISSING inside the transmitter inside actually didn’t do anything, it sends exception is in FlowableObserveOn ObserveOnSubscriber onNext time of inner class, found that 128 after the queue is full size, send onError information to the downstream of the observer.
ERROR
Test code I will not paste, directly put the above aboveBackpressureStrategy.MISSING
Instead ofBackpressureStrategy.ERROR
In fact, the exception thrown by MISSING is the same, but the message of the exception is different. Internally, an AtomicLong counter is defined when sending data. After each data is sent to the downstream, the counter is reduced by one. When it reaches zero, the onError message is directly sent upstream to the downstream.
BUFFER
This is no different from an RxJava2.0 Observable. The output capacity is not limited, and it does not throw exceptions like RxJava1.0.
DROP
Drop means that after the first 128 data is obtained, the second time when the data is retrieved from the queue, the middle data that cannot keep up with the speed is discarded. When the downstream processes the first 128 data, the next 96 data can be received. This. Limit = prefetch – (prefetch >> 2); Prefetch is 128, you can calculate it yourself:
Here from 5118… 96 the number of… The 96th number from 5118
DROP means that when sending data to the downstream, it has a traffic limiting policy. AtomicLong loads a 128-size counter, and after sending one data, the counter will be reduced by one. Therefore, when sending 128 data, the counter will be reduced to 0, and when the downstream processes 128 data, Then the counter will be adjusted to 96, so there will be data loss in the middle. When the downstream process the previous 128 data, the upstream will send the data again without starting from 129, and the 96 numbers are random, because the time of the downstream process the previous 128 numbers is uncertain.
LATEST
Let’s look at latest and see what it looks like. In order to distinguish between drop and latest, let’s change the amount of data to 2000:
Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(FlowableEmitter<Integer> emitter) throws Exception { for (int i = 0; i < 2000; i++) { emitter.onNext(i); }}}, BackpressureStrategy.LATEST).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<Integer>() { @Override public void onSubscribe(Subscription s) { Log.d(TAG, "onSubscribe"); s.request(Long.MAX_VALUE); subscription = s; } @Override public void onNext(Integer integer) { Log.d(TAG, "onNext: " + integer); } @Override public void onError(Throwable t) { Log.w(TAG, "onError: ", t); } @Override public void onComplete() { Log.d(TAG, "onComplete"); }});Copy the code
Here the result would look something like this:
Here’s how it receives data: 0-127…. 96 the number of… The final number, “latest”, is sent between 0 and 127, then 96 digits in between, where some data is lost, and finally the last number is sent downstream.
Instead of using a counter to limit the speed at which data can be sent, Latest defines an AtomicReference atom class for sending data, so it can store only one data at a time. By the time the first 128 are processed, The 128 numbers defined by AtomicLong will be reduced to 0, so when the downstream receives the first 128 numbers, the upstream can only send the next 96 numbers to the downstream, and when the last number is cached, only the last number can be sent to the downstream.
Well, there are only a few strategies for back pressure, and we can summarize them as follows:
- MISSION: There is no finite stream in the upstream. If the queue in the downstream is found to be full, send onError message to the downstream
io.reactivex.exceptions.MissingBackpressureException: Queue is full? !
.- ERROR: The upstream sends data to the downstream in the form of traffic limiting. When the number of data reaches 128, onError message is sent to the downstream
create: could not emit value due to lack of requests
.- DROP: The upstream sends data to the downstream in the form of stream limiting. When the amount of data reaches 128, it waits for the downstream to finish processing the 128 data, and then continues to process and comb the data. Therefore, data loss may occur during the waiting process.
- “LATEST” is the same as “DROP”, but the method is different. “LATEST” sends data downstream by storing only one data. At the beginning, the data is basically the same, but “LATEST” saves the last data because there is still one more data in the container.
- BUFFER: this is the same as RxJava2.0 sending data, it does not support back pressure, upstream sends as much data as downstream receives until OOM occurs.
conclusion
- DoOnSubscribe listens to the thread switch of subscribeOn every time
- DoOnNext listens for every observeOn thread switch, and the thread of the Map’s Apply method is determined by the upstream Observable that sends the data.
- If you do not specify the Observer thread and set the subscribeOn instead of the observeOn, the Observer thread goes along with the upstream Observable.
- Introduces the use of RxJava1.0 and RxJava2.0 back pressure and their differences.
Rxjava on the second article on the end of the introduction, if there is any do not understand the place can be direct message to ask me.
Thanks:
- Back pressure source code analysis
- About RxJava back pressure