Rxjava1 Upgrade rxJavA2 some knowledge summary
Backpressure
What is Backpressure?
When the Observable and the Subscriber work in different threads, and the speed of the Observable producing events is faster than the speed of the Subscriber consuming events, the events are stored in the queue. When the size of the queue exceeds a certain size, Will be thrown MissingBackpressureException
Every 1 ms production, such as the following code observables an event, but the Subscriber every seconds to consume an event, so a run MissingBackpressureException mistake immediately
Observable.interval(1, TimeUnit.MILLISECONDS)
.observeOn(Schedulers.newThread())
.subscribe({
Thread.sleep(1000)
System.out.println("onNext==$it")})Copy the code
How much larger is the queue? , let’s directly look at the source code of RXJavA1
static {
int defaultSize = 128;
// lower default for Android (https://github.com/ReactiveX/RxJava/issues/1820)
if (PlatformDependent.isAndroid()) {
defaultSize = 16;
}
// possible system property for overriding
String sizeFromProperty = System.getProperty("rx.ring-buffer.size"); // also see IndexedRingBuffer
if(sizeFromProperty ! = null) { try { defaultSize = Integer.parseInt(sizeFromProperty); } catch (NumberFormatException e) { System.err.println("Failed to set 'rx.buffer.size' with value " + sizeFromProperty + "= >" + e.getMessage()); // NOPMD
}
}
SIZE = defaultSize;
}
Copy the code
If Android is found, size is 16; If it’s anything else it’s 128
How do I change the default size?
Let’s start by looking at a few overloaded methods of observeOn
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, RxRingBuffer.SIZE);
}
public final Observable<T> observeOn(Scheduler scheduler, int bufferSize) {
return observeOn(scheduler, false, bufferSize);
}
Copy the code
You can specify the queue size when specifying the consuming thread. We use the first method, so the default size is 16, and we can use the second overloaded method to specify the queue size
Rxjava2 back pressure strategyBackpressureStrategy
Rxjava2 default queue size, source code below
public abstract class Flowable<T> implements Publisher<T> {
/** The default buffer size. */
static final int BUFFER_SIZE;
static {
BUFFER_SIZE = Math.max(1, Integer.getInteger("rx2.buffer-size", 128));
}
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Flowable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public static int bufferSize() {
returnBUFFER_SIZE; }... }Copy the code
Observable’s source code is as follows:
public abstract class Observable<T> implements ObservableSource<T> {
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public static int bufferSize() {
returnFlowable.bufferSize(); }... }Copy the code
It can be seen that the default size of rxJava2 Flowable and Observable is 128 (rxJava is 16), but the Flowable added to RxJava2 supports BackpressureStrategy, which Observable does not
Back pressure strategyBackpressureStrategy
- MISSING: The cache size is fixed. If the downstream consumption cannot keep up with the upstream production event, it will be reported
MissingBackpressureException
- ERROR: the cache size is fixed, and the downstream consumption cannot keep up with the upstream production events
MissingBackpressureException
- BUFFER: The cache is infinite. Upstream events are continuously produced and downstream events are continuously consumed until the program crashes
- DROP: The cache size is fixed. If the downstream consumption cannot keep up with the upstream production events and the cache is full, the production event will be discarded if the event is produced again
- LATEST: The cache size is fixed. If downstream consumption cannot keep up with upstream production events, the last event is replaced and the LATEST event is consumed
I feel no difference between MISSING and ERROR, and the effect is the same
Incidentally retrofit2 use BackpressureStrategy. LATEST strategy
RxJava2 Single, Completable, Maybe
Single emits only a Single data or error event
Single has only onSuccess and onError events. OnSuccess is similar to the onNext() method of Observable or Flowable, and is used to emit produced data. Single can only emit one data. Such as
Single.create(SingleOnSubscribe<Int> {
try {
it.onSuccess(1)
it.onSuccess(2)
} catch (e: Exception) {
it.onError(e)
}
}).subscribe({
System.out.println("==onNext: $it")
}, {
System.out.println("==onError==")})Copy the code
The print result is as follows:
==onNext: 1
Copy the code
You can see that the “2” data is not printed
Note: If an error is reported in the onSuccess callback, the onSuccess callback will not go onError, but crash
Completable
Completable only has onComplete and onError events, and it doesn’t send any data to the observer, it just tells the observer that it’s done or that it’s gone wrong.
Completable.create(CompletableOnSubscribe {
try {
//do something success
it.onComplete()
} catch (e: Exception) {
it.onError(e)
}
}).subscribe({
System.out.println("==onComplete==")
}, {
System.out.println("==onError==")})Copy the code
Note: If an error is reported in the onComplete callback, it will crash instead of onError
Maybe
Maybe can be seen as the combination of Single and Completable. It has onSuccess, onComplete and onError events, but they are mutually exclusive. At the same time Maybe can only send one data
If there is data, onSuccess is called to emit the data to the observer
If there is no data and no errors, onComplete is called to tell the observer that it is done
If something goes wrong, onError is called to tell the observer that something went wrong
Maybe. Create (MaybeOnSubscribe<Int> {try {// if there is data it.onsuccess (1) // if there is no data // it.oncomplete () // even if you have this method, } catch (e: Exception) {it.onError(e)}}).subscribe({system.out.println ())"==onSuccess: $it")
}, {
System.out.println("==onError==")
}, {
System.out.println("==onComplete==")})Copy the code
Maybe is like Single, even if multiple data is sent, the following data will not be processed
Note: If an error is reported in onSuccess or onComplete callback, crash will not go onError
How to handle the case where Rxjava2 does not support NULL
When transmitting data using RXJava, the onError event is raised directly if the emitted data is null
FlowableEmitter’s onNext implementation is shown below
@Override
public void onNext(T t) {
...
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return; }... }Copy the code
What if you do want to fire null data, but you don’t want to go onError because it’s not an exception?
Send Optional wrapped data
1. Specify Optional when using the create method to create the Flowable
Flowable.create(FlowableOnSubscribe<Optional<Int>> {
try {
it.onNext(Optional.ofNullable(1))
it.onNext(Optional.ofNullable(null))
it.onComplete()
} catch (e: Exception) {
it.onError(e)
}
}, BackpressureStrategy.LATEST)
.subscribe({
if (it.isPresent) {
System.out.println("==Next: ${it.get()}")}else {
System.out.println("==Next: null")
}
}, {
System.out.println("==onError==")
}, {
System.out.println("==onComplete==")})Copy the code
2. Specify Optional when using other rxJava2 operators
Take the map operator
Flowable.just(1).map {// If you are looking for a user based on id, you may return null, but not onError Optional. OfNullable (findUserById(it))}.subscribe({if (it.isPresent) {
System.out.println("==Next: ${it.get()}")}else {
System.out.println("==Next: null")
}
}, {
System.out.println("==onError==")
}, {
System.out.println("==onComplete==")})Copy the code
Note: Do not call the get() method when Optional is used. If null is used, the get() method will raise an exception. I’m going to use the isPresent method first
The onNext of rxJava1 allows the return of NULL. In this case, we can package a BaseSubscribe
abstract class BaseSubscribe<T> : DisposableSubscriber<Optional<T>>() {
override fun onComplete() {
}
override fun onNext(t: Optional<T>) {
if (t.isPresent) {
onOpNext(t.get())
} else {
onOpNext(null)
}
}
abstract fun onOpNext(t: T?)
override fun onError(t: Throwable) {
}
}
Copy the code
We then use the BaseSubscribe class all the time when we call the SUBSCRIBE method
Flowable.just(1).map {// Return null Optional. OfNullable (findUserById(it))}. Subscribe (object: BaseSubscribe<User>() { override fun onOpNext(t: User?) { System.out.println("==Next: ${t ? : null}")}})Copy the code
In addition to usingOptional
In addition to wrapping, you can also specify a datatype, such asResult<T>
class Result<T> {
var data: T? = null
...
}
Copy the code