Make writing a habit together! This is the fourth day of my participation in the “Gold Digging Day New Plan · April More text Challenge”. Click here for more details.

When errors occur in RxJava, the onError method of Subscriber is called to distribute errors, and then the errors are handed over to the Subscriber for its own processing. In this way, we need to process each Subscriber once, which increases the workload, so we need to use the error processing operator.

Error handling operator

1.catch

The Catch operator intercepts the onError notification of the original Observable and replaces it with another data item or sequence. Allows the resulting Observable to terminate normally or not at all. RxJava implements catch as the following three unused operators. OnErrorReturn: Emits a special Observable when it encounters an error and terminates normally.

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onError(new Exception("ex"));
        emitter.onNext(3);
        emitter.onComplete();
    }
    }).onErrorReturn(new Function<Throwable, Integer>() {
        @Override
        public Integer apply(Throwable throwable) throws Exception {
            return -1;
        }
    }).subscribe(new Observer<Integer>() {
        public void onSubscribe(Disposable d) {
        }
        public void onNext(Integer integer) {
            Log.i(TAG, "onNext: "+integer);
        }
        public void onError(Throwable e) {
            Log.i(TAG, "onError: "+e.getMessage());
        }
        public void onComplete() {
            Log.i(TAG, "onComplete: ");
        }
    });
Copy the code

The output is as follows

 I/TAG: onNext: 1
 I/TAG: onNext: 2
 I/TAG: onNext: -1
 I/TAG: onComplete: 
Copy the code

onErrorResumeNext: whenObservableFire another when an error is encounteredObservableData sequence of.

Final Observable o = observable. just(10,20,30); Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onError(new Exception("ex")); emitter.onNext(3); emitter.onComplete(); } }).onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() { @Override public ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception { return o; } }).subscribe(new Observer<Integer>() { public void onSubscribe(Disposable d) { } public void onNext(Integer integer) {  Log.i(TAG, "onNext: "+integer); } public void onError(Throwable e) { Log.i(TAG, "onError: "+e.getMessage()); } public void onComplete() { Log.i(TAG, "onComplete: "); }});Copy the code

The output is as follows

 I/TAG: onNext: 1
 I/TAG: onNext: 2
 I/TAG: onNext: 10
 I/TAG: onNext: 20
 I/TAG: onNext: 30
 I/TAG: onComplete: 
Copy the code

OnExceptionResumeNext: Similar to onErrorResumeNext, but when onError receives a Throwable instead of an Exception, it will pass the error to the observer’s onError method instead of using the alternate Observable.

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onError(new Exception("ex"));
        emitter.onNext(3);
        emitter.onComplete();
    }
    }).onExceptionResumeNext(new ObservableSource<Integer>() {
        @Override
        public void subscribe(Observer<? super Integer> observer) {
            observer.onNext(10000);
            observer.onComplete();
        }
    }).subscribe(new Observer<Integer>() {
        public void onSubscribe(Disposable d) {
        }
        public void onNext(Integer integer) {
            Log.i(TAG, "onNext: "+integer);
        }
        public void onError(Throwable e) {
            Log.i(TAG, "onError: "+e.getMessage());
        }
        public void onComplete() {
            Log.i(TAG, "onComplete: ");
        }
    });
Copy the code

The output is as follows

I/TAG: onNext: 1 I/TAG: onNext: 2 I/TAG: onNext: 10000// Error replacement message I/TAG: onComplete:Copy the code

If the Throwable received is not an Exception, onExceptionResumeNext cannot intercept Throwable

 I/TAG: onNext: 1
 I/TAG: onNext: 2
 I/TAG: onError: ex
Copy the code

2.retry

Instead of sending the onError of the original Observable to the observer, the Retry operator re-subscribes to it expecting it to terminate normally. Duplication of data items may occur due to re-subscription. Retry () continuously subscribes to and emits the original Observable until it terminates normally. Retry (count) Specifies the number of retries. If the number of times exceeded, the subscription will not be renewed. Instead, it sends a new onError to its observer.

Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onError(new Throwable("ex")); emitter.onNext(3); emitter.onComplete(); } }).retry(1).subscribe(new Observer<Integer>() { public void onSubscribe(Disposable d) { } public void onNext(Integer integer) { Log.i(TAG, "onNext: "+integer); } public void onError(Throwable e) { Log.i(TAG, "onError: "+e.getMessage()); } public void onComplete() { Log.i(TAG, "onComplete: "); }});Copy the code

The output is as follows

 I/TAG: onNext: 1
 I/TAG: onNext: 2
 I/TAG: onNext: 1
 I/TAG: onNext: 2
 I/TAG: onError: ex
Copy the code

Auxiliary operator

1.delay

The delay operator allows the original Observable to pause for a while before transmitting each data item, i.e., all data items emitted by an Observable move forward by an increment in time. Each time the original Observable sends an item, Delay starts a timer, and when the timer expires, the returned Observable sends the same item.

Note: Delay does not shift onError notifications. He immediately passes this notification to the subscriber, discarding the onNext notification.

Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onError(new Throwable("Throwable")); emitter.onNext(3); } }).delay(10, Subscribe(new Observer<Integer>() {public void onSubscribe(Disposable d) {Log. I (TAG, "satrtTime: "+System.currentTimeMillis()); } public void onNext(Integer integer) { Log.i(TAG, "endTime-onNext: "+System.currentTimeMillis()+" "+integer); } public void onError(Throwable e) { Log.i(TAG, "endTime-onError: "+System.currentTimeMillis()+" "+e); } public void onComplete() { Log.i(TAG, "endTime-onComplete: "+System.currentTimeMillis()); }});Copy the code

The following output is displayed:

I/TAGTAG: satrtTime: 1524668052808 I/TAGTAG: endTime-onError: 1524668052812 java.lang.Throwable: Throwable //(endtime-satrtTime) small and 10sCopy the code

2.do

The DO operator registers callbacks for the original Observable life cycle events that are executed when an event is triggered. The do operator contains the following operators:

  • doOnEach ObservableEvery time I send a data item, I call him back, no matterobNext.onErrororonCompleted.
  • DoOnNext onlyonNextIt only calls him back when it’s executed.
  • DoOnSubscribe when an observer subscribesObservableIt’s going to be called.
  • DoOnComplete onlyonCompleteIt only calls him back when it’s executed.
  • DoOnError onlyonErrorIt only calls him back when it’s executed.
  • doOnTerminate ObservableCalled before termination (normal or abnormal).
  • doFinally ObservableCalled after termination (normal or abnormal).

doOnEach

doOnSubscribe

Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onError(new Throwable("throw")); // emitter.onComplete(); } }).doOnEach(new Consumer<Notification<Integer>>() { @Override public void accept(Notification<Integer> integerNotification) throws Exception { Log.i(TAG, "doOnEach: "+integerNotification); } }).doOnNext(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.i(TAG, "doOnNext: "+integer); } }).doOnComplete(new Action() { @Override public void run() throws Exception { Log.i(TAG, "doOnComplete: "); } }).doOnError(new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.i(TAG, "doOnError: "+throwable.getMessage()); } }).doOnTerminate(new Action() { @Override public void run() throws Exception { Log.i(TAG, "doOnTerminate: "); } }).doFinally(new Action() { @Override public void run() throws Exception { Log.i(TAG, "doFinally: "); } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.i(TAG, "subscribe: "+integer); }});Copy the code

The following output is displayed:

 I/TAGTAG: doOnEach: OnNextNotification[1]
 I/TAGTAG: doOnNext: 1
 I/TAGTAG: subscribe: 1
 I/TAGTAG: doOnEach: OnNextNotification[2]
 I/TAGTAG: doOnNext: 2
 I/TAGTAG: subscribe: 2
 I/TAGTAG: doOnEach: OnErrorNotification[java.lang.Throwable: throw]
 I/TAGTAG: doOnError: throw
 I/TAGTAG: doOnTerminate: 
Copy the code

It can be seen that doOnEach will trigger either onNext or onError, but doFinally is not executed here, which is wrong with the description and needs to be studied.

3.subscribeOn/observeOn

  • SubscribeOn is used to specify the thread from which the event is generated.
  • ObserveOn is used to specify the thread to consume the event.
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { Log.i(TAG, "subscribeOn: " +Thread.currentThread().getName()); emitter.onNext(1); }}) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.i(TAG, "observeOn: " +Thread.currentThread().getName()); }});Copy the code

The output is:

 I/TAGTAG: subscribeOn: RxCachedThreadScheduler-1
 I/TAGTAG: observeOn: main
Copy the code

4.timeout

If the original Observable passes the specified event without emitting any data. The timeout operator throws a TimeoutException and terminates the Observable with an onError notification.

Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { for (int i=0; i<3; i++){ if(i == 1){ SystemClock.sleep(3000); }else { emitter.onNext(i); } } } }).timeout(2,TimeUnit.SECONDS) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } public void onNext(Integer integer) { Log.i(TAG, "onNext: "+integer); } public void onError(Throwable e) { Log.i(TAG, "onError: "); } public void onComplete() { } });Copy the code

The output is:

 I/TAGTAG: onNext: 0
 I/TAGTAG: onError: 
Copy the code