Rxjava1 Upgrade rxJavA2 some knowledge summary

Backpressure

What is Backpressure?

When the Observable and the Subscriber work in different threads, and the speed of the Observable producing events is faster than the speed of the Subscriber consuming events, the events are stored in the queue. When the size of the queue exceeds a certain size, Will be thrown MissingBackpressureException

Every 1 ms production, such as the following code observables an event, but the Subscriber every seconds to consume an event, so a run MissingBackpressureException mistake immediately

Observable.interval(1, TimeUnit.MILLISECONDS)
    .observeOn(Schedulers.newThread())
    .subscribe({
        Thread.sleep(1000)
        System.out.println("onNext==$it")})Copy the code

How much larger is the queue? , let’s directly look at the source code of RXJavA1

static {
    int defaultSize = 128;

    // lower default for Android (https://github.com/ReactiveX/RxJava/issues/1820)
    if (PlatformDependent.isAndroid()) {
        defaultSize = 16;
    }

    // possible system property for overriding
    String sizeFromProperty = System.getProperty("rx.ring-buffer.size"); // also see IndexedRingBuffer
    if(sizeFromProperty ! = null) { try { defaultSize = Integer.parseInt(sizeFromProperty); } catch (NumberFormatException e) { System.err.println("Failed to set 'rx.buffer.size' with value " + sizeFromProperty + "= >" + e.getMessage()); // NOPMD
        }
    }

    SIZE = defaultSize;
}
Copy the code

If Android is found, size is 16; If it’s anything else it’s 128

How do I change the default size?

Let’s start by looking at a few overloaded methods of observeOn

public final Observable<T> observeOn(Scheduler scheduler) {
    return observeOn(scheduler, RxRingBuffer.SIZE);
}
    
public final Observable<T> observeOn(Scheduler scheduler, int bufferSize) {
    return observeOn(scheduler, false, bufferSize);
}
Copy the code

You can specify the queue size when specifying the consuming thread. We use the first method, so the default size is 16, and we can use the second overloaded method to specify the queue size

Rxjava2 back pressure strategyBackpressureStrategy

Rxjava2 default queue size, source code below

public abstract class Flowable<T> implements Publisher<T> {
    /** The default buffer size. */
    static final int BUFFER_SIZE;
    static {
        BUFFER_SIZE = Math.max(1, Integer.getInteger("rx2.buffer-size", 128));
    }
    
    @CheckReturnValue
    @BackpressureSupport(BackpressureKind.FULL)
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Flowable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }
    
    public static int bufferSize() {
        returnBUFFER_SIZE; }... }Copy the code

Observable’s source code is as follows:

public abstract class Observable<T> implements ObservableSource<T> {

	@CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }
    
    public static int bufferSize() {
        returnFlowable.bufferSize(); }... }Copy the code

It can be seen that the default size of rxJava2 Flowable and Observable is 128 (rxJava is 16), but the Flowable added to RxJava2 supports BackpressureStrategy, which Observable does not

Back pressure strategyBackpressureStrategy

  • MISSING: The cache size is fixed. If the downstream consumption cannot keep up with the upstream production event, it will be reportedMissingBackpressureException
  • ERROR: the cache size is fixed, and the downstream consumption cannot keep up with the upstream production eventsMissingBackpressureException
  • BUFFER: The cache is infinite. Upstream events are continuously produced and downstream events are continuously consumed until the program crashes
  • DROP: The cache size is fixed. If the downstream consumption cannot keep up with the upstream production events and the cache is full, the production event will be discarded if the event is produced again
  • LATEST: The cache size is fixed. If downstream consumption cannot keep up with upstream production events, the last event is replaced and the LATEST event is consumed

I feel no difference between MISSING and ERROR, and the effect is the same

Incidentally retrofit2 use BackpressureStrategy. LATEST strategy

RxJava2 Single, Completable, Maybe

Single emits only a Single data or error event

Single has only onSuccess and onError events. OnSuccess is similar to the onNext() method of Observable or Flowable, and is used to emit produced data. Single can only emit one data. Such as

Single.create(SingleOnSubscribe<Int> {
    try {
        it.onSuccess(1)
        it.onSuccess(2)
    } catch (e: Exception) {
        it.onError(e)
    }
}).subscribe({
    System.out.println("==onNext: $it")
}, {
    System.out.println("==onError==")})Copy the code

The print result is as follows:

==onNext: 1
Copy the code

You can see that the “2” data is not printed

Note: If an error is reported in the onSuccess callback, the onSuccess callback will not go onError, but crash

Completable

Completable only has onComplete and onError events, and it doesn’t send any data to the observer, it just tells the observer that it’s done or that it’s gone wrong.

Completable.create(CompletableOnSubscribe {
    try {
        //do something success
        it.onComplete()
    } catch (e: Exception) {
        it.onError(e)
    }
}).subscribe({
    System.out.println("==onComplete==")
}, {
    System.out.println("==onError==")})Copy the code

Note: If an error is reported in the onComplete callback, it will crash instead of onError

Maybe

Maybe can be seen as the combination of Single and Completable. It has onSuccess, onComplete and onError events, but they are mutually exclusive. At the same time Maybe can only send one data

If there is data, onSuccess is called to emit the data to the observer

If there is no data and no errors, onComplete is called to tell the observer that it is done

If something goes wrong, onError is called to tell the observer that something went wrong

Maybe. Create (MaybeOnSubscribe<Int> {try {// if there is data it.onsuccess (1) // if there is no data // it.oncomplete () // even if you have this method, } catch (e: Exception) {it.onError(e)}}).subscribe({system.out.println ())"==onSuccess: $it")
}, {
    System.out.println("==onError==")
}, {
    System.out.println("==onComplete==")})Copy the code

Maybe is like Single, even if multiple data is sent, the following data will not be processed

Note: If an error is reported in onSuccess or onComplete callback, crash will not go onError

How to handle the case where Rxjava2 does not support NULL

When transmitting data using RXJava, the onError event is raised directly if the emitted data is null

FlowableEmitter’s onNext implementation is shown below

@Override
public void onNext(T t) {
    ...
    if (t == null) {
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
        return; }... }Copy the code

What if you do want to fire null data, but you don’t want to go onError because it’s not an exception?

Send Optional wrapped data

1. Specify Optional when using the create method to create the Flowable

Flowable.create(FlowableOnSubscribe<Optional<Int>> {
    try {
        it.onNext(Optional.ofNullable(1))
        it.onNext(Optional.ofNullable(null))
        it.onComplete()
    } catch (e: Exception) {
        it.onError(e)
    }
}, BackpressureStrategy.LATEST)
        .subscribe({
            if (it.isPresent) {
                System.out.println("==Next: ${it.get()}")}else {
                System.out.println("==Next: null")
            }
        }, {
            System.out.println("==onError==")
        }, {
            System.out.println("==onComplete==")})Copy the code

2. Specify Optional when using other rxJava2 operators

Take the map operator

Flowable.just(1).map {// If you are looking for a user based on id, you may return null, but not onError Optional. OfNullable (findUserById(it))}.subscribe({if (it.isPresent) {
            System.out.println("==Next: ${it.get()}")}else {
            System.out.println("==Next: null")
        }
    }, {
        System.out.println("==onError==")
    }, {
        System.out.println("==onComplete==")})Copy the code

Note: Do not call the get() method when Optional is used. If null is used, the get() method will raise an exception. I’m going to use the isPresent method first

The onNext of rxJava1 allows the return of NULL. In this case, we can package a BaseSubscribe

abstract class BaseSubscribe<T> : DisposableSubscriber<Optional<T>>() {

    override fun onComplete() {

    }

    override fun onNext(t: Optional<T>) {
        if (t.isPresent) {
            onOpNext(t.get())
        } else {
            onOpNext(null)
        }
    }

    abstract fun onOpNext(t: T?)

    override fun onError(t: Throwable) {

    }
}

Copy the code

We then use the BaseSubscribe class all the time when we call the SUBSCRIBE method

Flowable.just(1).map {// Return null Optional. OfNullable (findUserById(it))}. Subscribe (object: BaseSubscribe<User>() { override fun onOpNext(t: User?) { System.out.println("==Next: ${t ? : null}")}})Copy the code

In addition to usingOptionalIn addition to wrapping, you can also specify a datatype, such asResult<T>

class Result<T> {
    var data: T? = null
    ...
}

Copy the code