RxJava daily use summary (six) auxiliary operation

This article describes the accessibility of RxJava. Such as Delay Delay, Doxx series event hooks, thread switching and so on.

Delay operator

Emits emits from an Observable after a specified period of time. Postpones the event specified to emit Observable.

Observable
.fromIterable(mItems)
.delay(1, TimeUnit.SECONDS)
.as(RxLifecycleUtil.bindLifecycle(this))
.subscribe();
Copy the code

Do operators

Register an action as a placeholder for the original Observable lifecycle event.

  • The events emitted by an Observable can be processed in the subscribe register. But sometimes an Observable is provided outside, and where to register is unknown. It is impossible to write a set for every subscribe. This is where the do operators come in. The DO series operators act as hooks for events, calling callbacks to operations using the DO series at execution time.

DoOnEach operator

The doOnEach operator lets you register a callback that is called every time an Observable emits data. The parameter is Notification.

Observable.fromIterable(mItems)
    .doOnEach(new Consumer<Notification<Object>>() {
        @Override
        public void accept(Notification<Object> notification) throws Exception {
            //...
        }
    })
    .as(RxLifecycleUtil.bindLifecycle(this))
    .subscribe();
Copy the code

DoOnNext operator

The doOnNext operator is similar to doOnEach(Action1), but instead of accepting a Notification parameter, its Action accepts the emitted data item.

Observable.fromIterable(mItems)
    .doOnNext(new Consumer<Object>() {
        @Override
        public void accept(Object o) throws Exception {
            L.d(o.toString());
        }
    })
    .as(RxLifecycleUtil.bindLifecycle(this))
    .subscribe();
Copy the code

DoOnSubscribe operator

The doOnSubscribe operator registers an action that is called when an observer subscribes to the Observable it generates.

  • It is called when registered, such as an Observable requested by the interface, and a wait box pops up when registered.
Observable.fromIterable(mItems) .doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable Disposable) throws Exception {// Ready to start showLoading(); } }) .as(RxLifecycleUtil.bindLifecycle(this)) .subscribe();Copy the code

DoOnComplete operator

Register a completion callback

Observable.fromIterable(mItems)
        .doOnComplete(new Action() {@override public void run() throws Exception {// Complete... } }) .as(RxLifecycleUtil.bindLifecycle(this)) .subscribe();Copy the code

DoOnError operator

Register an error callback

Observable.fromIterable(mItems) .doOnError(new Consumer<Throwable>() { @Override public void accept(Throwable throwable) Throws the Exception {/ / abnormal}}) as (RxLifecycleUtil. BindLifecycle (this)). The subscribe ();Copy the code

DoTerminate operator

The doTerminate operator registers an action that is called when the resulting Observable terminates, either normally or abnormally.

Observable.fromIterable(mItems)
        .doOnTerminate(new Action() { @Override public void run() throws Exception { //... Ready to end}}). The as (RxLifecycleUtil. BindLifecycle (this)). The subscribe ();Copy the code

DoAfterTerminate operator

The doAfterTerminate operator registers an action that is called when the resulting Observable terminates, either normally or abnormally.

  • When the interface request succeeds or fails, the popover is hidden
Observable.fromIterable(mItems)
        .doAfterTerminate(new Action() {@override public void run() throws Exception {// hideLoading() has ended; } }) .as(RxLifecycleUtil.bindLifecycle(this)) .subscribe();Copy the code

ObserveOn and subscribeOn operators

ObserveOn: Specifies on which scheduler an observer observes the Observable. It’s basically the thread that calls back at the end of a task, and Android is usually the main thread.

SubscribeOn: Specifies which scheduler the Observable itself executes on. You can say it’s the time-consuming operation specific thread, usually the IO thread or the Computation thread.

  • The Scheduler of the Android Handler is based on the Scheduler provided on RxAnroid. For example,
Observable (mItems) //Observable executes in the child thread, so the child thread iterates through the.subscribeon (schedulers.io ()) // callback thread, In the main thread. ObserveOn (AndroidSchedulers mainThread ()), the as (RxLifecycleUtil. BindLifecycle (this)). The subscribe ();Copy the code

The timeout operator

A mirror of the original Observable sends an error notification if no data is emitted after a specified amount of time.

Observable sends an onError() after the specified time. The exception object is TimeoutException.

  • An Observable encapsulating WebSocket, for example, emits onError() when no event is emitted within a specified period of time, and retry to reconnect to the WebSocket.
Observable. Create (new WebSocketOnSubscribe(URL)) // If no message is sent between data sources, a timeout exception is sent. .timeout(timeout, timeUnit).retry() Implement response callback methods do UI processing. SubscribeOn (Schedulers. IO ()). ObserveOn (AndroidSchedulers. MainThread ());Copy the code

Timestamp operator

Appends a timestamp to data items emitted by an Observable

  • For example, in WebSocket practice, the back end asks us how many seconds to send a WebSocket heartbeat message. With a timestamp inside, you can use the TIMESTAMP operator to wrap the data as the Timed object. (Inside is the MAP operator that wraps data around a Timed object with a time field for the current timestamp.)
/** * public Observable<Boolean>sendHeartBeatMsg() {
return getRxWebSocket().heartBeat(getUrl(),
        AskTeacherConstant.CONSULTING_ROOM_PING_MSG_INTERVAL_TIME,
        TimeUnit.SECONDS, new HeartBeatGenerateCallback() {
            @Override
            public String onGenerateHeartBeatMsg(long timestamp) {
                returnGsonUtil.toJson(new HeartBeatMsgRequestModel(WssCommandTypeEnum.HEART_BEAT.getCode(), String.valueOf(timestamp / 1000))); }}); } @Override public Observable<Boolean> heartBeat(String url, int period, TimeUnit unit,HeartBeatGenerateCallback heartBeatGenerateCallback) {returnObservable. Interval (period, unit) //timestamp operator, Timestamp ().retry().flatmap (new Function<Timed<Long>, ObservableSource<Boolean>>() { @Override public ObservableSource<Boolean> apply(Timed<Long> timed) throws Exception { long timestamp = timed.time(); String heartBeatMsg = heartBeatGenerateCallback.onGenerateHeartBeatMsg(timestamp); Logger.d(TAG,"Send heartbeat message:" + heartBeatMsg);
		            returnsend(url, heartBeatMsg); }}); }Copy the code

Serialize operator

Forces an Observable to call continuously and behave correctly.

An Observable emits events from multiple (asynchronous) child threads, which can cause event chaos. Perhaps onNext(), onError, and onComplete are not in the correct order, and using the serialize operator can return events in synchronous order.

Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        emitter.onNext("wally");
        emitter.onNext("wally");
        emitter.onComplete();
        emitter.onNext("wally"); } }) .serialize() .subscribe(); // result //wally //wally //wally //onCompleteCopy the code