RxJava2

A library that facilitates thread switching, similar to Handler, Kotlin’s coroutine, maintains a thread pool internally.

RxJava also benefits from brevity, but its brevity is distinguished by its ability to keep it simple as the program logic becomes more complex.

Introduction of depend on

    implementation 'the IO. Reactivex. Rxjava2: rxkotlin: 2.2.0'
    implementation 'the IO. Reactivex. Rxjava2: rxjava: 2.0.1'
    implementation 'the IO. Reactivex. Rxjava2: rxandroid: 2.0.1'
    api 'com. Squareup. Retrofit2: adapter - rxjava2:2.6.2'
Copy the code

Observer model

Observable, Observer, subscribe()

In fact, RxJava can be compared to a juice, there are many kinds of fruit at home (raw data to send), you want to squeeze some juice to drink, at this time you want to drink what kind of juice? If you want to drink avocado and pear lemon juice, you combine the three fruits and squeeze them together (using various operators to transform the data you want to send to the observer), then you can drink the juice you want (send the processed data to the observer).

fun main(a) {
    // Create an observed
    Observable.create(object : ObservableOnSubscribe<String> {
        override fun subscribe(e: ObservableEmitter<String>) {
            e.onNext("1")
            e.onNext("2")
            e.onNext("3")
            e.onComplete()
        }
        // Subscribe to an observer
    }).subscribe(object : Observer<String> {
        override fun onComplete(a) {
            println("onComplete")}override fun onSubscribe(d: Disposable) {
            println("onSubscribe")}override fun onNext(t: String) {
            println("onNext")}override fun onError(e: Throwable) {
            println("onError")}}}Copy the code

Creates an observed Observable

Create: same as above

Just: No more than 10 events can be sent by just

fun main(a) {
    // Create an observed
    Observable.just("11"."33"."12").subscribe(object : Observer<String> {
        override fun onComplete(a) {
            println("onComplete")}override fun onSubscribe(d: Disposable) {
            println("onSubscribe")}override fun onNext(t: String) {
            println(t)
        }

        override fun onError(e: Throwable) {
            println("onError")}}}Copy the code

FromArray: Similar to just, but fromArray can pass in more than 10 variables. It can also pass in arrays directly, preceded by *

    val array = arrayOf("1"."3"."2")
    // Note that we need to add an * to pass the array to Java...
    Observable.fromArray(*array).subscribe(object : Observer<String> {
        override fun onComplete(a) {
            println("onComplete")}override fun onSubscribe(d: Disposable) {
            println("onSubscribe")}override fun onNext(t: String) {
            println(t)
        }

        override fun onError(e: Throwable) {
            println("onError")}})Copy the code

FromIterable: Send a list collection directly to the observer

val list = listOf<String>("1"."3"."2")
Observable.fromIterable(list)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe ({ print(it) }, {print(it.message)})
Copy the code

Interval (initialDelay: Long, period: Long, unit: TimeUnit) : An event will be sent at regular intervals. The event starts from 0 and increases by 1. InitialDelay Indicates the delay for sending the event for the first time. More suitable for polling processing

       val disposable = CompositeDisposable() 
       disposable.add(
                Observable.interval(0.1, TimeUnit.SECONDS)
                        .retry(3)
                        .delay(4, TimeUnit.SECONDS)
                        .map { it.toString() }
                        .subscribe { (t -> Log.i(TAG, t)), {Log.i(TAG, it.message)} }
       )
Copy the code

Subscribe observer

    public final Disposable subscribe(a) {}
    // Indicates no observer
    public final Disposable subscribe(Consumer<? super T> onNext) {}
    // Indicates that the observer responds only to the Next event sent by the observer
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {} 
    // Indicates that the observer responds only to the Next & Error events sent by the observed

    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
    // Indicates that the observer responds only to Next events, Error events, and Complete events sent by the observer

    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
    // Indicates that the observer responds only to Next events, Error events, Complete events, and onSubscribe events sent by the observer

    public final void subscribe(Observer<? super T> observer) {}
Copy the code

Dispose () cuts off the connection between the observer and the observed, but the observed can still send events

Transformation operator

Map: Can convert data types sent by observers to other types

FlatMap: Elements in the event sequence can be integrated and processed to return a new observed. A disorderly

data class Plan(val names: List<String>)
data class Person(val plans: List<Plan>)

fun main(a) {
    val planList = listOf<Plan>(Plan(listOf("Zhang"."Bill")), Plan(listOf("Fifty"."ddddd")))
    val personList = listOf<Person>(Person(planList), Person(planList))
    Observable.fromIterable(personList)
        .flatMap { Observable.fromIterable(it.plans) }
        .flatMap { Observable.fromIterable(it.names) }
        .subscribe ({
            println(it)
        }, {})
}
Copy the code

ConcatMap: Same as flatMap, but the events sent are ordered. The sequence of events generated by the new merge is ordered, that is, events are sent in exactly the same order as the old sequence

Merge/combine operators

Concat /concatArray: Connect multiple observers, concatArray has no limit, concAT can send up to four events. After merging, it is executed serially, not concurrently, in the order of sending

fun main(a) {
    Observable.concat(
        Observable.just(1.2),
        Observable.just(4.3),
        Observable.just(6.5)
    ).subscribe ({ println(it) }, {})
}
// 1 2 4 3 6 5
Copy the code

Merge /mergeArray: Similar to concat/concatArray, merge and execute in parallel timeline

private fun initRxDemo(a) {

   val zip1 = Observable.just(listOf("A"."B"."C"."DDDD"))
   val zip2 = Observable.just(listOf(1.2.3))

   disposable.add(
       Observable.merge(zip1, zip2)
                        / / the List < Any > type
                       .concatMap { Observable.fromIterable(it) }
                       .subscribeOn(Schedulers.io())
                       .observeOn(AndroidSchedulers.mainThread())
                       .subscribe({ Log.i(TAG, it.toString()) }, { showError() })
    )
}

Copy the code

Zip: used to send multiple network requests at the same time and combine the result data of multiple requests. Async similar to Kotlin coroutines

  1. Event combination = contraption merge in strict accordance with the original event sequence
  2. Number of events eventually merged = multiple observed (Observable)
private fun initRxDemo(a) {
        val zip1 = Observable.fromIterable(listOf("A"."B"."C"."DDDD"))
        val zip2 = Observable.fromIterable(listOf(1.2.3))

        if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N) {
            disposable.add(
                    Observable.zip(zip1, zip2, object : BiFunction<String, Int, String> {
                        override fun apply(t: String, u: Int): String {
                            return "$t $u"
                        }
                    })
                            .subscribeOn(Schedulers.io())
                            .observeOn(AndroidSchedulers.mainThread())
                            .subscribe({ Log.i("LIUJIANDD", it.toString()) }, { showError() })
            )
        }
 }
//com.liujian.kotlindemo I/LIUJIANDD: A 1
//com.liujian.kotlindemo I/LIUJIANDD: B 2
//com.liujian.kotlindemo I/LIUJIANDD: C 3
Copy the code

Reduce: Serves the same functions as kotlin set reduce

**startWith(T item)** Inserts another data event before the data emitted by the source Observable (which is inserted before the data emitted by the source Observable). It is used as a ShowLoading event before sending network requests

    private fun initRxDemo(a) {
        val zip1 = Observable.fromIterable(listOf("A"."B"."C"."DDDD"))

        zip1.startWith("0")
                .startWithArray("1"."3"."2")
                .subscribe { Log.i(TAG, it) }
    }
    // 1 3 2 0 A B C DDD
Copy the code

Functional operator

GroupBy: grouping

OnErrorReturn: Converts the error event into an object, if any

.onErrorReturn { Result.getFailData(it.message) }
Copy the code

Retry: All event sequences are retransmitted if an error event occurs. Times is the number of retransmissions

fun main(a) {
    Observable.create<String> {
        it.onNext("1")
        it.onError(Exception("404"))
        it.onComplete()
    }.retry(2)
        .subscribe(object : Observer<String> {
            override fun onComplete(a){}override fun onSubscribe(d: Disposable){}override fun onNext(t: String) {
                println(t)
            }

            override fun onError(e: Throwable) {
                println(e.message)
            }

        })
}
// 1 1 1 404
Copy the code

Filter: Filters events sent by the observer. If true is returned, the event is sent; otherwise, no event is sent

Skip (count: Int) : count indicates the number of skipped events

Take (count: Int) : Gets the number of events received before count

DoOnNext: called before executing the Next event

fun main(a) {
    Observable.fromIterable(listOf(3.2.3.4.5.7.9.10))
        .doOnNext {
            if (it > 5) {
                println("doOnNext$it")
            }
        }
        .subscribe({ println(it)}, {showError()})
}
Copy the code

DoAfterNext, doOnComplete

DoOnEach: called every time an Observable sends a data event

Compose: Extract some repetitive code from RxJava chain calls

    private fun refreshToken(a) {
        Observable.just(UserInfoManager.getRefreshToken())
            .flatMap {
                apiService.reFreshToken(mapOf("refreshToken" to it))
            }
            .compose(getResultTransfer())
            .subscribe(object : Observer<RefreshToken> {
                override fun onComplete(a){}override fun onSubscribe(d: Disposable){}override fun onNext(t: RefreshToken) {
                    updateToken(t)
                }

                override fun onError(e: Throwable) {
                    Logger.e(TAG, "refreshToken error: ${e.message}")}}}fun getResultTransfer(a) = ObservableTransformer<RefreshToken, RefreshToken> {
        it.subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
    }
Copy the code

Thread scheduling

SubscribeOn: Specifies the observed thread

ObserveOn: Specifies the thread of the observer, which takes effect every time it executes

The following table summarizes the schedulers in RxJava:

The scheduler role
Schedulers.com putation ( ) Used to use computational tasks such as event loops and callback processing
Schedulers. Immediate ( ) The current thread
Schedulers. IO ( ) Used for IO intensive tasks if asynchronously blocking IO operations.
Schedulers. NewThread ( ) Create a new thread
AndroidSchedulers.mainThread() Android UI thread, used to manipulate the UI.

A case in point

Bitmaps are processed in asynchronous threads and displayed in the main thread. Not written in RxJava

new Thread() {
    @Override
    public void run(a) {
        super.run();
        for (File folder : folders) {
            File[] files = folder.listFiles();
            for (File file : files) {
                if (file.getName().endsWith(".png")) {
                    final Bitmap bitmap = getBitmapFromFile(file);
                    getActivity().runOnUiThread(new Runnable() {
                        @Override
                        public void run(a) { imageCollectorView.addImage(bitmap); }}); } } } } }.start();Copy the code

The processing with RxJava is as follows

fun main(a) {
    val folders = listOf<File>()
    Observable.fromIterable(folders)
        .flatMap { Observable.fromArray(*it.listFiles()) }
        .filter { it.name.endsWith(".png") }
        .map { getBitmapFromFile(it) }
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe ({
            imageCollectorView.addImage(bitmap)
        }, {showError()})
}
Copy the code

CompositeDisposable

Rxjava, while easy to use, is known to be prone to layer memory leaks. If the activity or fragment is destroyed, it will still occupy the memory and cannot be released. And disposable is the subscription event that can be used to unsubscribe.

Create a BaseActivity CompositeDisposable to manage the disposable subscribe to events, and then at the time of the activity destroyed, call CompositeDisposable. The dispose () all subscribe to the event can be cut off, Prevent memory leaks.

abstract class BaseActivity : AppCompatActivity() {

    var mDisposable = CompositeDisposable()

    override fun onCreate(savedInstanceState: Bundle?). {
        super.onCreate(savedInstanceState)
        window.addFlags(WindowManager.LayoutParams.FLAG_KEEP_SCREEN_ON)
        initParameter()
        setContentView(getLayoutResId())
    }

    override fun onDestroy(a) {
        super.onDestroy()
        // Disconnect all subscription events to prevent memory leaks
        if(! mDisposable.isDisposed){ mDisposable.dispose() } } }Copy the code
btnEnsure.setOnClickListener {
      mDisposable.add(viewModel.login(etAccount.text.toString(), 
                                      etPassword.text.toString()))
}
Copy the code

Back pressure

Back pressure is used to solve the problem that the observer sends events too fast, and the observer has no time to receive all the events, so the observer cannot process all the sent events in time, and eventually causes the cache overflow and event loss & OOM

Principle: Establish a cache area, and discard, retain, and report error events that exceed the size of the cache area (default: 128)

Back pressure concrete implementation: Flowable. Back pressure is no longer supported by Observables in 2.0

The basic usage of Flowable is very similar to that of an Observable

        disposable.add(
                Flowable.create<Int> ({for (i in (0 until 300)) {
                        it.onNext(i)
                    }
                    it.onComplete()
                    }, BackpressureStrategy.LATEST)
                        .subscribeOn(Schedulers.io())
                        .observeOn(AndroidSchedulers.mainThread())
                        .subscribe({
                            Log.i(TAG, it.toString())
                        }) { Log.e(TAG, it.toString()) }
        )
Copy the code

Back pressure strategy:

  • BackpressureStrategy.ERROR: Directly throws an exceptionMissingBackpressureException
  • BackpressureStrategy.MISSING: Directly throws an exceptionMissingBackpressureExceptionQueue is full
  • BackpressureStrategy. BUFFER, BUFFER size is set to infinite. Pay attention to memory and avoid OOM
  • BackpressureStrategy. DROP: more than discarded event buffer size (128)
  • BackpressureStrategy. LATEST: save the LATEST (last) events, only more than discarded event buffer size (128) (i.e. if sent 150 events, buffer zone will hold 129 events (1-128 + 128).

In addition, RxJava 2.0 internally provides methods that encapsulate the back pressure policy pattern

  • onBackpressureBuffer()
  • onBackpressureDrop()
  • onBackpressureLatest()

The default using BackpressureStrategy. The ERROR model

        disposable.add(
                Flowable.fromIterable(List<Int> (300) { it })
                        .onBackpressureLatest() // LATEST back pressure strategy
                        .subscribeOn(Schedulers.io())
                        .observeOn(AndroidSchedulers.mainThread())
                        .subscribe({ Log.i(TAG, it.toString()) }) {
                            Log.e(TAG, it.toString())
                        }
        )
Copy the code

RxJava2 and Retrofit2

Retrofit supports the return of an Observable as a web request method

interface UserApiInterface {
    @POST("api/passport/login/v3/account")
    fun login(@Body body: RequestBody): Observable<Response<LoginResponse>>
}

inline fun <reified S> createService(serviceClass: Class<S>, baseUrl: String): S {
 val retrofit = Retrofit.Builder()
            .baseUrl(baseUrl)
            .addConverterFactory(GsonConverterFactory.create())
            .addCallAdapterFactory(RxJava2CallAdapterFactory.createWithScheduler(Schedulers.io())) // Set the network request in the child thread
            .addCallAdapterFactory(DeferredCallAdapterFactory.create())
            .client(mHttpClient)
            .build()
        val s: S = retrofit.create(serviceClass)
        return s
    }

Copy the code

Requirement: Polling network requests, error retry three times processing

val model = MarketTickerModel(mTickerIdList)
mDisposable.add(
   // Timed polling for 5 seconds
   Observable.interval(0.5, TimeUnit.SECONDS)
     .flatMap {model.loadData3() }
     .retry(3) // The request failed and retry three times
     .map { it.map { MarketGlobalIndexMapConvertUtils.convertNetWorkData(it) } }
     .subscribeOn(Schedulers.io())
     .observeOn(AndroidSchedulers.mainThread())
     .subscribe({
          showData(it)
      }, { showLoading() })
)
Copy the code

Requirement: Process two network requests at the same time and get the result of the request

    fun loadDataRxJava(composite: CompositeDisposable) {
        composite.add(
            Observable.interval(0.5, TimeUnit.SECONDS)
                .flatMap {
                    Observable.zip(
                        model.loadData2(),
                        chartModel.loadData2(),
                        BiFunction<List<TickerBase>, List<ChartViewModel>, List<StockViewModel>> { t1, t2 ->
                            t1.map {
                                val tickerBase = it
                                val chartModel = t2.filter { it.tickerId == tickerBase.tickerId }.firstOrNull()
                                valentryList = chartModel? .entryList ? : arrayListOf<Entry>()valmaxLength = chartModel? .maxLength ? :0
                                StockViewModel(
                                    it.tickerId,
                                    it.type,
                                    it.name ?: "--", CountyResManager.getDrawableResByCounty(it.regionCode), it.close ? :"", it.changeRatio ? :"",
                                    entryList,
                                    maxLength
                                )
                            }
                        })
                }
                .retry(3) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe( { getPageView()? .sendUIEvent(ChartUIEvent.ShowTickerData(it,false))
                }, {showError()}))
    }
Copy the code