RxJava constitute
The paper
RxJava = Observable + Operator + Scheduler + Observer?
Observable
.fromArray(1.2.3.4)
.map { it * 5 }
.filter { it -> it > 10 }
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe { println(it) }
Copy the code
Observable*
What are the characteristics of the observable data sources,
- Usually do work when you start or stop listening
- Synchronous or asynchronous
- Single item,many items,or empty
- Terminates with an error or succeeds to completion
- May never terminate!
- Just an implemention of the Observer pattern
What can be called observable data sources
- View event (touch, click..)
- Data returned by a network request
- Query the data returned by the database
- Mobile phone sensor notification data (GPS, power)
- Alarm timer notification…
How to create an Observable
- Observable.just(“Hello”)
- Observable.fromArray(“Hello”,”Hello2″)
- Observable.fromCallable{“Hello”}
- Observable.create{ it.onNext(“Hello”)}
- Observable.Interval(200)
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) {
emitter.onNext("Hello"); emitter.onComplete(); }})Copy the code
Observable.fromCallable(new Callable<String>() {
@Override
public String call() throws Exception {
return "Hello"; }});Copy the code
Meanwhile, in the Android usage scenario:
- RxView.clicks(view) : Observable
- login(@Body LoginReqParams) : Observable
- RxPermissions.request(READ_PHONE_STATE) : Observable
- Creating Observables : create,just,from
- Transforming Observables: Map, FlatMap
- Filtering Observables:Filter,Debouncee,Distinct,Skip
- Combining Observables : Merge,Zip,CombineLatest
- Error Handling Operators: Catch,Retry
- Observable Utility Operators: Delay,TimeInterval,ObserveOn,SubscribeOn
- Conditional and Boolean Operators : All,Contains,TkeWhile
- Mathematical and Aggregate Operators : Average,Count,Max
- Connectable Observable Operators : Connect,Replay
- Operators to Convert Observables : To
Operator Operator
introduce
Each language-specific implementation of ReactiveX implements a set of operators. It can be broadly divided into the following categories:
map
fliter
Scheduler Threads schedule
subscribeOn
Specifies the thread in which the subscription process runs and controls Observables and observers. Let’s take a look at a code run.
println("in main:${Thread.currentThread()}")
Observable.create<Int> {
println("in create:${Thread.currentThread()}");
it.onNext(1)}//.subscribeOn(Schedulers.newThread())
.subscribe { println("in next :${Thread.currentThread()} $it")}// Run the result
in main:Thread[main,5,main]
in create:Thread[main,5,main]
in next :Thread[main,5,main] 5
Copy the code
After subscribeOn is added, it is found that both the upstream and downstream running threads of subscribeOn are changed.
println("in main:${Thread.currentThread()}")
Observable.create<Int> {
println("in create:${Thread.currentThread()}");
it.onNext(1) }
.subscribeOn(Schedulers.newThread())
.subscribe { println("in next :${Thread.currentThread()} $it")}// Run the result
in main:Thread[main,5,main]
in create:Thread[RxNewThreadScheduler-1.5,main]
in next :Thread[RxNewThreadScheduler-1.5,main] 5
Copy the code
observeOn
Specifies the thread of the observer. Let’s take a look at a code run.
println("in main:${Thread.currentThread()}")
Observable.create<Int> {
println("in create:${Thread.currentThread()}");
it.onNext(1)}//.observeOn(Schedulers.newThread())
.subscribe { println("int next :${Thread.currentThread()} $it")}// Run the result
in main:Thread[main,5,main]
in create:Thread[main,5,main]
in next :Thread[main,5,main] 5
Copy the code
When observeOn is added, it is found that the threads running downstream of observeOn have changed.
println("in main:${Thread.currentThread()}")
Observable.create<Int> {
println("in create:${Thread.currentThread()}");
it.onNext(1) }
.observeOn(Schedulers.newThread())
.subscribe { println("in next :${Thread.currentThread()} $it")}// Run the result
in main:Thread[main,5,main]
in create:Thread[main,5,main]
in next :Thread[RxNewThreadScheduler-1.5,main] 5
Copy the code
The Observer observers
slightly
Application scenarios in APP
List the current scenarios where RxJava is used in our App.
Network request :Retrofit+RxJava
@POST("/content/user/info")
Observable<BaseResponse<UserInfo>> userInfo_Ob(@Body UserInfoReq req);
Copy the code
Here’s a piece of code where we use the operator to synchronize server data when our app initializes
// Synchronize server data during app initialization.
fun checkRequiredObservable(a): Observable<PersonRequiredInfo> {
// Personal information required later
var personRequiredInfo = PersonRequiredInfo()
val allObservables = arrayListOf(
...
// a series of separate requests
appTabSetting(),// Bottom TAB configuration
userInfoObservable(),// User information
queryBindConfig(),// The user binds the account information
syncBabyInfoObservable(userId) // Synchronize baby information...).var zipObservable = Observable.zip(allObservables) {
return@zip personRequiredInfo
}
return apolloAppConfig()// Get Apollo configuration information and save the configuration
.flatMap {zipObservable } // parallel the above series of requests
.onErrorReturn { personRequiredInfo }
.doOnNext {registerUserSuperProperties()}
.compose(RxHelper.io2MainThread())
}
private fun syncBabyInfoObservable(a): Observable<Any> {
return getBabyInfoObservable() // Get the server baby information first
.flatMap { updateBabyInfoObservable() } // Upload if required
.flatMap { getBabyInfoObservable().map { Any() } }// Get the synchronized baby information again
.onErrorReturn { Any()}
}
Copy the code
Given the current state of the interface, it’s hard to imagine how I would have combined these requests without RxJava.
Debounce and Throttle
debounce
Returns an Observable that mirrors the source ObservableSource, except that it drops items emitted by the source ObservableSource that are followed by newer items before a timeout value expires. The timer resets on each emission.
throttleFirst
Returns an Observable that emits only the first item emitted by the source ObservableSource during sequential time windows of a specified duration.
View repeat click
RxView.clicks(container)
.throttleFirst(800,TimeUnit.MILLISECONDS)
.subscribe { onclick() }
Copy the code
Search image stabilization
RxTextView.textChanges(etSearch)
.debounce(searchDebounceTime, TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe { goSearch(chars) }
Copy the code
Page refresh frequency control
class RefreshDebounce {
private var rxEmitter: ObservableEmitter<Long>? = null
private var observable = Observable
.create<Long> { rxEmitter = it }
.debounce(1000L, TimeUnit.MILLISECONDS)
constructor(consumer: (Long) - >Unit?). { observable.subscribe { consumer.invoke(it) } }fun callRefresh(a)= rxEmitter? .onNext(System.currentTimeMillis()) }/ / use
var refreshDebounce = RefreshDebounce { println("refresh:$it") }
refreshDebounce.callRefresh()
Copy the code
Timing/delay execution
private void startCountDown15Min() {
countDown15MinDisposable = Observable.timer(900, TimeUnit.SECONDS)
subscribe(aLong -> Log.i(TAG, "The countdown is up."));
}
private void cancelCountDown15Min() {
if(countDown15MinDisposable ! =null&&! countDown15MinDisposable.isDisposed()) { countDown15MinDisposable.dispose(); }}Copy the code
System Permission Application
new RxPermissions(activity)
.request(Manifest.permission.READ_PHONE_STATE)
.subscribe(aBoolean -> Log.d(TAG,"result:"+aBoolean)
Copy the code