RxJava daily use summary (six) auxiliary operation
This article describes the accessibility of RxJava. Such as Delay Delay, Doxx series event hooks, thread switching and so on.
Delay operator
Emits emits from an Observable after a specified period of time. Postpones the event specified to emit Observable.
Observable
.fromIterable(mItems)
.delay(1, TimeUnit.SECONDS)
.as(RxLifecycleUtil.bindLifecycle(this))
.subscribe();
Copy the code
Do operators
Register an action as a placeholder for the original Observable lifecycle event.
- The events emitted by an Observable can be processed in the subscribe register. But sometimes an Observable is provided outside, and where to register is unknown. It is impossible to write a set for every subscribe. This is where the do operators come in. The DO series operators act as hooks for events, calling callbacks to operations using the DO series at execution time.
DoOnEach operator
The doOnEach operator lets you register a callback that is called every time an Observable emits data. The parameter is Notification.
Observable.fromIterable(mItems)
.doOnEach(new Consumer<Notification<Object>>() {
@Override
public void accept(Notification<Object> notification) throws Exception {
//...
}
})
.as(RxLifecycleUtil.bindLifecycle(this))
.subscribe();
Copy the code
DoOnNext operator
The doOnNext operator is similar to doOnEach(Action1), but instead of accepting a Notification parameter, its Action accepts the emitted data item.
Observable.fromIterable(mItems)
.doOnNext(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
L.d(o.toString());
}
})
.as(RxLifecycleUtil.bindLifecycle(this))
.subscribe();
Copy the code
DoOnSubscribe operator
The doOnSubscribe operator registers an action that is called when an observer subscribes to the Observable it generates.
- It is called when registered, such as an Observable requested by the interface, and a wait box pops up when registered.
Observable.fromIterable(mItems) .doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable Disposable) throws Exception {// Ready to start showLoading(); } }) .as(RxLifecycleUtil.bindLifecycle(this)) .subscribe();Copy the code
DoOnComplete operator
Register a completion callback
Observable.fromIterable(mItems)
.doOnComplete(new Action() {@override public void run() throws Exception {// Complete... } }) .as(RxLifecycleUtil.bindLifecycle(this)) .subscribe();Copy the code
DoOnError operator
Register an error callback
Observable.fromIterable(mItems) .doOnError(new Consumer<Throwable>() { @Override public void accept(Throwable throwable) Throws the Exception {/ / abnormal}}) as (RxLifecycleUtil. BindLifecycle (this)). The subscribe ();Copy the code
DoTerminate operator
The doTerminate operator registers an action that is called when the resulting Observable terminates, either normally or abnormally.
Observable.fromIterable(mItems)
.doOnTerminate(new Action() { @Override public void run() throws Exception { //... Ready to end}}). The as (RxLifecycleUtil. BindLifecycle (this)). The subscribe ();Copy the code
DoAfterTerminate operator
The doAfterTerminate operator registers an action that is called when the resulting Observable terminates, either normally or abnormally.
- When the interface request succeeds or fails, the popover is hidden
Observable.fromIterable(mItems)
.doAfterTerminate(new Action() {@override public void run() throws Exception {// hideLoading() has ended; } }) .as(RxLifecycleUtil.bindLifecycle(this)) .subscribe();Copy the code
ObserveOn and subscribeOn operators
ObserveOn: Specifies on which scheduler an observer observes the Observable. It’s basically the thread that calls back at the end of a task, and Android is usually the main thread.
SubscribeOn: Specifies which scheduler the Observable itself executes on. You can say it’s the time-consuming operation specific thread, usually the IO thread or the Computation thread.
- The Scheduler of the Android Handler is based on the Scheduler provided on RxAnroid. For example,
Observable (mItems) //Observable executes in the child thread, so the child thread iterates through the.subscribeon (schedulers.io ()) // callback thread, In the main thread. ObserveOn (AndroidSchedulers mainThread ()), the as (RxLifecycleUtil. BindLifecycle (this)). The subscribe ();Copy the code
The timeout operator
A mirror of the original Observable sends an error notification if no data is emitted after a specified amount of time.
Observable sends an onError() after the specified time. The exception object is TimeoutException.
- An Observable encapsulating WebSocket, for example, emits onError() when no event is emitted within a specified period of time, and retry to reconnect to the WebSocket.
Observable. Create (new WebSocketOnSubscribe(URL)) // If no message is sent between data sources, a timeout exception is sent. .timeout(timeout, timeUnit).retry() Implement response callback methods do UI processing. SubscribeOn (Schedulers. IO ()). ObserveOn (AndroidSchedulers. MainThread ());Copy the code
Timestamp operator
Appends a timestamp to data items emitted by an Observable
- For example, in WebSocket practice, the back end asks us how many seconds to send a WebSocket heartbeat message. With a timestamp inside, you can use the TIMESTAMP operator to wrap the data as the Timed object. (Inside is the MAP operator that wraps data around a Timed object with a time field for the current timestamp.)
/** * public Observable<Boolean>sendHeartBeatMsg() {
return getRxWebSocket().heartBeat(getUrl(),
AskTeacherConstant.CONSULTING_ROOM_PING_MSG_INTERVAL_TIME,
TimeUnit.SECONDS, new HeartBeatGenerateCallback() {
@Override
public String onGenerateHeartBeatMsg(long timestamp) {
returnGsonUtil.toJson(new HeartBeatMsgRequestModel(WssCommandTypeEnum.HEART_BEAT.getCode(), String.valueOf(timestamp / 1000))); }}); } @Override public Observable<Boolean> heartBeat(String url, int period, TimeUnit unit,HeartBeatGenerateCallback heartBeatGenerateCallback) {returnObservable. Interval (period, unit) //timestamp operator, Timestamp ().retry().flatmap (new Function<Timed<Long>, ObservableSource<Boolean>>() { @Override public ObservableSource<Boolean> apply(Timed<Long> timed) throws Exception { long timestamp = timed.time(); String heartBeatMsg = heartBeatGenerateCallback.onGenerateHeartBeatMsg(timestamp); Logger.d(TAG,"Send heartbeat message:" + heartBeatMsg);
returnsend(url, heartBeatMsg); }}); }Copy the code
Serialize operator
Forces an Observable to call continuously and behave correctly.
An Observable emits events from multiple (asynchronous) child threads, which can cause event chaos. Perhaps onNext(), onError, and onComplete are not in the correct order, and using the serialize operator can return events in synchronous order.
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("wally");
emitter.onNext("wally");
emitter.onComplete();
emitter.onNext("wally"); } }) .serialize() .subscribe(); // result //wally //wally //wally //onCompleteCopy the code