RxJava2
A library that facilitates thread switching, similar to Handler, Kotlin’s coroutine, maintains a thread pool internally.
RxJava also benefits from brevity, but its brevity is distinguished by its ability to keep it simple as the program logic becomes more complex.
Introduction of depend on
implementation 'the IO. Reactivex. Rxjava2: rxkotlin: 2.2.0'
implementation 'the IO. Reactivex. Rxjava2: rxjava: 2.0.1'
implementation 'the IO. Reactivex. Rxjava2: rxandroid: 2.0.1'
api 'com. Squareup. Retrofit2: adapter - rxjava2:2.6.2'
Copy the code
Observer model
Observable, Observer, subscribe()
In fact, RxJava can be compared to a juice, there are many kinds of fruit at home (raw data to send), you want to squeeze some juice to drink, at this time you want to drink what kind of juice? If you want to drink avocado and pear lemon juice, you combine the three fruits and squeeze them together (using various operators to transform the data you want to send to the observer), then you can drink the juice you want (send the processed data to the observer).
fun main(a) {
// Create an observed
Observable.create(object : ObservableOnSubscribe<String> {
override fun subscribe(e: ObservableEmitter<String>) {
e.onNext("1")
e.onNext("2")
e.onNext("3")
e.onComplete()
}
// Subscribe to an observer
}).subscribe(object : Observer<String> {
override fun onComplete(a) {
println("onComplete")}override fun onSubscribe(d: Disposable) {
println("onSubscribe")}override fun onNext(t: String) {
println("onNext")}override fun onError(e: Throwable) {
println("onError")}}}Copy the code
Creates an observed Observable
Create: same as above
Just: No more than 10 events can be sent by just
fun main(a) {
// Create an observed
Observable.just("11"."33"."12").subscribe(object : Observer<String> {
override fun onComplete(a) {
println("onComplete")}override fun onSubscribe(d: Disposable) {
println("onSubscribe")}override fun onNext(t: String) {
println(t)
}
override fun onError(e: Throwable) {
println("onError")}}}Copy the code
FromArray: Similar to just, but fromArray can pass in more than 10 variables. It can also pass in arrays directly, preceded by *
val array = arrayOf("1"."3"."2")
// Note that we need to add an * to pass the array to Java...
Observable.fromArray(*array).subscribe(object : Observer<String> {
override fun onComplete(a) {
println("onComplete")}override fun onSubscribe(d: Disposable) {
println("onSubscribe")}override fun onNext(t: String) {
println(t)
}
override fun onError(e: Throwable) {
println("onError")}})Copy the code
FromIterable: Send a list collection directly to the observer
val list = listOf<String>("1"."3"."2")
Observable.fromIterable(list)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe ({ print(it) }, {print(it.message)})
Copy the code
Interval (initialDelay: Long, period: Long, unit: TimeUnit) : An event will be sent at regular intervals. The event starts from 0 and increases by 1. InitialDelay Indicates the delay for sending the event for the first time. More suitable for polling processing
val disposable = CompositeDisposable()
disposable.add(
Observable.interval(0.1, TimeUnit.SECONDS)
.retry(3)
.delay(4, TimeUnit.SECONDS)
.map { it.toString() }
.subscribe { (t -> Log.i(TAG, t)), {Log.i(TAG, it.message)} }
)
Copy the code
Subscribe observer
public final Disposable subscribe(a) {}
// Indicates no observer
public final Disposable subscribe(Consumer<? super T> onNext) {}
// Indicates that the observer responds only to the Next event sent by the observer
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
// Indicates that the observer responds only to the Next & Error events sent by the observed
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
// Indicates that the observer responds only to Next events, Error events, and Complete events sent by the observer
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
// Indicates that the observer responds only to Next events, Error events, Complete events, and onSubscribe events sent by the observer
public final void subscribe(Observer<? super T> observer) {}
Copy the code
Dispose () cuts off the connection between the observer and the observed, but the observed can still send events
Transformation operator
Map: Can convert data types sent by observers to other types
FlatMap: Elements in the event sequence can be integrated and processed to return a new observed. A disorderly
data class Plan(val names: List<String>)
data class Person(val plans: List<Plan>)
fun main(a) {
val planList = listOf<Plan>(Plan(listOf("Zhang"."Bill")), Plan(listOf("Fifty"."ddddd")))
val personList = listOf<Person>(Person(planList), Person(planList))
Observable.fromIterable(personList)
.flatMap { Observable.fromIterable(it.plans) }
.flatMap { Observable.fromIterable(it.names) }
.subscribe ({
println(it)
}, {})
}
Copy the code
ConcatMap: Same as flatMap, but the events sent are ordered. The sequence of events generated by the new merge is ordered, that is, events are sent in exactly the same order as the old sequence
Merge/combine operators
Concat /concatArray: Connect multiple observers, concatArray has no limit, concAT can send up to four events. After merging, it is executed serially, not concurrently, in the order of sending
fun main(a) {
Observable.concat(
Observable.just(1.2),
Observable.just(4.3),
Observable.just(6.5)
).subscribe ({ println(it) }, {})
}
// 1 2 4 3 6 5
Copy the code
Merge /mergeArray: Similar to concat/concatArray, merge and execute in parallel timeline
private fun initRxDemo(a) {
val zip1 = Observable.just(listOf("A"."B"."C"."DDDD"))
val zip2 = Observable.just(listOf(1.2.3))
disposable.add(
Observable.merge(zip1, zip2)
/ / the List < Any > type
.concatMap { Observable.fromIterable(it) }
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({ Log.i(TAG, it.toString()) }, { showError() })
)
}
Copy the code
Zip: used to send multiple network requests at the same time and combine the result data of multiple requests. Async similar to Kotlin coroutines
- Event combination = contraption merge in strict accordance with the original event sequence
- Number of events eventually merged = multiple observed (
Observable
)
private fun initRxDemo(a) {
val zip1 = Observable.fromIterable(listOf("A"."B"."C"."DDDD"))
val zip2 = Observable.fromIterable(listOf(1.2.3))
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N) {
disposable.add(
Observable.zip(zip1, zip2, object : BiFunction<String, Int, String> {
override fun apply(t: String, u: Int): String {
return "$t $u"
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({ Log.i("LIUJIANDD", it.toString()) }, { showError() })
)
}
}
//com.liujian.kotlindemo I/LIUJIANDD: A 1
//com.liujian.kotlindemo I/LIUJIANDD: B 2
//com.liujian.kotlindemo I/LIUJIANDD: C 3
Copy the code
Reduce: Serves the same functions as kotlin set reduce
**startWith(T item)** Inserts another data event before the data emitted by the source Observable (which is inserted before the data emitted by the source Observable). It is used as a ShowLoading event before sending network requests
private fun initRxDemo(a) {
val zip1 = Observable.fromIterable(listOf("A"."B"."C"."DDDD"))
zip1.startWith("0")
.startWithArray("1"."3"."2")
.subscribe { Log.i(TAG, it) }
}
// 1 3 2 0 A B C DDD
Copy the code
Functional operator
GroupBy: grouping
OnErrorReturn: Converts the error event into an object, if any
.onErrorReturn { Result.getFailData(it.message) }
Copy the code
Retry: All event sequences are retransmitted if an error event occurs. Times is the number of retransmissions
fun main(a) {
Observable.create<String> {
it.onNext("1")
it.onError(Exception("404"))
it.onComplete()
}.retry(2)
.subscribe(object : Observer<String> {
override fun onComplete(a){}override fun onSubscribe(d: Disposable){}override fun onNext(t: String) {
println(t)
}
override fun onError(e: Throwable) {
println(e.message)
}
})
}
// 1 1 1 404
Copy the code
Filter: Filters events sent by the observer. If true is returned, the event is sent; otherwise, no event is sent
Skip (count: Int) : count indicates the number of skipped events
Take (count: Int) : Gets the number of events received before count
DoOnNext: called before executing the Next event
fun main(a) {
Observable.fromIterable(listOf(3.2.3.4.5.7.9.10))
.doOnNext {
if (it > 5) {
println("doOnNext$it")
}
}
.subscribe({ println(it)}, {showError()})
}
Copy the code
DoAfterNext, doOnComplete
DoOnEach: called every time an Observable sends a data event
Compose: Extract some repetitive code from RxJava chain calls
private fun refreshToken(a) {
Observable.just(UserInfoManager.getRefreshToken())
.flatMap {
apiService.reFreshToken(mapOf("refreshToken" to it))
}
.compose(getResultTransfer())
.subscribe(object : Observer<RefreshToken> {
override fun onComplete(a){}override fun onSubscribe(d: Disposable){}override fun onNext(t: RefreshToken) {
updateToken(t)
}
override fun onError(e: Throwable) {
Logger.e(TAG, "refreshToken error: ${e.message}")}}}fun getResultTransfer(a) = ObservableTransformer<RefreshToken, RefreshToken> {
it.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
}
Copy the code
Thread scheduling
SubscribeOn: Specifies the observed thread
ObserveOn: Specifies the thread of the observer, which takes effect every time it executes
The following table summarizes the schedulers in RxJava:
The scheduler | role |
---|---|
Schedulers.com putation ( ) | Used to use computational tasks such as event loops and callback processing |
Schedulers. Immediate ( ) | The current thread |
Schedulers. IO ( ) | Used for IO intensive tasks if asynchronously blocking IO operations. |
Schedulers. NewThread ( ) | Create a new thread |
AndroidSchedulers.mainThread() | Android UI thread, used to manipulate the UI. |
A case in point
Bitmaps are processed in asynchronous threads and displayed in the main thread. Not written in RxJava
new Thread() {
@Override
public void run(a) {
super.run();
for (File folder : folders) {
File[] files = folder.listFiles();
for (File file : files) {
if (file.getName().endsWith(".png")) {
final Bitmap bitmap = getBitmapFromFile(file);
getActivity().runOnUiThread(new Runnable() {
@Override
public void run(a) { imageCollectorView.addImage(bitmap); }}); } } } } }.start();Copy the code
The processing with RxJava is as follows
fun main(a) {
val folders = listOf<File>()
Observable.fromIterable(folders)
.flatMap { Observable.fromArray(*it.listFiles()) }
.filter { it.name.endsWith(".png") }
.map { getBitmapFromFile(it) }
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe ({
imageCollectorView.addImage(bitmap)
}, {showError()})
}
Copy the code
CompositeDisposable
Rxjava, while easy to use, is known to be prone to layer memory leaks. If the activity or fragment is destroyed, it will still occupy the memory and cannot be released. And disposable is the subscription event that can be used to unsubscribe.
Create a BaseActivity CompositeDisposable to manage the disposable subscribe to events, and then at the time of the activity destroyed, call CompositeDisposable. The dispose () all subscribe to the event can be cut off, Prevent memory leaks.
abstract class BaseActivity : AppCompatActivity() {
var mDisposable = CompositeDisposable()
override fun onCreate(savedInstanceState: Bundle?). {
super.onCreate(savedInstanceState)
window.addFlags(WindowManager.LayoutParams.FLAG_KEEP_SCREEN_ON)
initParameter()
setContentView(getLayoutResId())
}
override fun onDestroy(a) {
super.onDestroy()
// Disconnect all subscription events to prevent memory leaks
if(! mDisposable.isDisposed){ mDisposable.dispose() } } }Copy the code
btnEnsure.setOnClickListener {
mDisposable.add(viewModel.login(etAccount.text.toString(),
etPassword.text.toString()))
}
Copy the code
Back pressure
Back pressure is used to solve the problem that the observer sends events too fast, and the observer has no time to receive all the events, so the observer cannot process all the sent events in time, and eventually causes the cache overflow and event loss & OOM
Principle: Establish a cache area, and discard, retain, and report error events that exceed the size of the cache area (default: 128)
Back pressure concrete implementation: Flowable. Back pressure is no longer supported by Observables in 2.0
The basic usage of Flowable is very similar to that of an Observable
disposable.add(
Flowable.create<Int> ({for (i in (0 until 300)) {
it.onNext(i)
}
it.onComplete()
}, BackpressureStrategy.LATEST)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({
Log.i(TAG, it.toString())
}) { Log.e(TAG, it.toString()) }
)
Copy the code
Back pressure strategy:
- BackpressureStrategy.ERROR: Directly throws an exception
MissingBackpressureException
- BackpressureStrategy.MISSING: Directly throws an exception
MissingBackpressureException
Queue is full - BackpressureStrategy. BUFFER, BUFFER size is set to infinite. Pay attention to memory and avoid OOM
- BackpressureStrategy. DROP: more than discarded event buffer size (128)
- BackpressureStrategy. LATEST: save the LATEST (last) events, only more than discarded event buffer size (128) (i.e. if sent 150 events, buffer zone will hold 129 events (1-128 + 128).
In addition, RxJava 2.0 internally provides methods that encapsulate the back pressure policy pattern
onBackpressureBuffer()
onBackpressureDrop()
onBackpressureLatest()
The default using BackpressureStrategy. The ERROR model
disposable.add(
Flowable.fromIterable(List<Int> (300) { it })
.onBackpressureLatest() // LATEST back pressure strategy
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({ Log.i(TAG, it.toString()) }) {
Log.e(TAG, it.toString())
}
)
Copy the code
RxJava2 and Retrofit2
Retrofit supports the return of an Observable as a web request method
interface UserApiInterface {
@POST("api/passport/login/v3/account")
fun login(@Body body: RequestBody): Observable<Response<LoginResponse>>
}
inline fun <reified S> createService(serviceClass: Class<S>, baseUrl: String): S {
val retrofit = Retrofit.Builder()
.baseUrl(baseUrl)
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.createWithScheduler(Schedulers.io())) // Set the network request in the child thread
.addCallAdapterFactory(DeferredCallAdapterFactory.create())
.client(mHttpClient)
.build()
val s: S = retrofit.create(serviceClass)
return s
}
Copy the code
Requirement: Polling network requests, error retry three times processing
val model = MarketTickerModel(mTickerIdList)
mDisposable.add(
// Timed polling for 5 seconds
Observable.interval(0.5, TimeUnit.SECONDS)
.flatMap {model.loadData3() }
.retry(3) // The request failed and retry three times
.map { it.map { MarketGlobalIndexMapConvertUtils.convertNetWorkData(it) } }
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({
showData(it)
}, { showLoading() })
)
Copy the code
Requirement: Process two network requests at the same time and get the result of the request
fun loadDataRxJava(composite: CompositeDisposable) {
composite.add(
Observable.interval(0.5, TimeUnit.SECONDS)
.flatMap {
Observable.zip(
model.loadData2(),
chartModel.loadData2(),
BiFunction<List<TickerBase>, List<ChartViewModel>, List<StockViewModel>> { t1, t2 ->
t1.map {
val tickerBase = it
val chartModel = t2.filter { it.tickerId == tickerBase.tickerId }.firstOrNull()
valentryList = chartModel? .entryList ? : arrayListOf<Entry>()valmaxLength = chartModel? .maxLength ? :0
StockViewModel(
it.tickerId,
it.type,
it.name ?: "--", CountyResManager.getDrawableResByCounty(it.regionCode), it.close ? :"", it.changeRatio ? :"",
entryList,
maxLength
)
}
})
}
.retry(3) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe( { getPageView()? .sendUIEvent(ChartUIEvent.ShowTickerData(it,false))
}, {showError()}))
}
Copy the code