Relevant concepts

Android Multithreaded programming principles:

  1. Don’t block the UI thread;
  2. Do not access UI components outside of the UI thread;

ReactiveX

  • Reactive Extensions (Rx)
  • ReactiveX is a programming interface that uses observable data streams for asynchronous programming. ReactiveX combines the best of observer mode, iterator mode and functional programming.

RxJava

  • Reactive Extensions for the JVM
  • Chain call based on event flow, time-consuming tasks, thread switching, its essence is an asynchronous operation library;

The principle of

  • Based on the observer pattern, the operation of one aspect depends on the state change of another aspect. When an object must notify other objects, it is expected that this object is loosely coupled with other notified objects.

Three elements

  • Observable, Subscriber, subscribe;

The observed

1. Observable
  • Events can be sent multiple times (onNext) until onComplete or onError is called to end the subscription;
  • Does not support the back pressure: when observed sending large amounts of data quickly, won’t do other downstream processing, even if a large number of data accumulation, the chain will not quote MissingBackpressureException, consumes too much memory will only OOM. (An official line of 1,000 events is used as a reference);
val observable = Observable.create(ObservableOnSubscribe<Int> { it.onNext(1) it.onNext(3) it.onNext(5) it.onComplete() Ljylogutil.d ("${thread.currentThread ().name}_subscribe")}) // Dispose () <String> {override fun onSubscribe(d: override fun onSubscribe) {disposable: disposable? Disposable) {// Call back to this method before sending data after subscription, Disposable Disposable = lJYLOgutil. d("${thread.currentThread ().name}_onSubscribe")} Override fun onNext(t: String) { LjyLogUtil.d("${Thread.currentThread().name}_onNext:$t") } override fun onError(e: Throwable) { LjyLogUtil.d("${Thread.currentThread().name}_onError:${e.message}") } override fun onComplete() { Ljylogutil.d ("${thread.currentThread ().name}_onComplete")}} //rxJava is a chain of observables, observables, and observables. // Each node in the middle of the chain is both a downstream Observable and an upstream observer Observable. // Only the top end is used for the origin Observable when subscribeOn() is called multiple times. / / reason subscribeOn bottom by new observables. SubscribeOn (Schedulers. IO ()). SubscribeOn (AndroidSchedulers. MainThread ()) //observeOn specifies the thread after which the operation is to be performed. The program implements multiple thread switches through multiple calls to observeOn. //observeOn is each observer below it. Schedulers.io()). Map {ljylogutil.d ("map: ${Thread.currentThread().name}") "num_$it" } .observeOn(AndroidSchedulers.mainThread()) .subscribe(observer)Copy the code
2. Flowable:
  • Like An Observable, it supports reaction-Streams and back pressure.
  • The upstream observed responds to the downstream observer’s data request, and the downstream calls Request (n) to tell the upstream how many data to send. This prevents a lot of data from piling up on the call chain and keeps memory low;
var sub: Subscription? = null Flowable.create(FlowableOnSubscribe<Int> { it.onNext(1) it.onNext(3) it.onNext(5) it.onComplete() Ljylogutil. d("${thread.currentThread ().name}_subscribe") You need to specify the back pressure strategy}, BackpressureStrategy BUFFER) Flowable. Range (0, 5). The subscribe (object: Subscriber<Int> { override fun onSubscribe(s: Subscription?) {// This method is called first when subscribed, which is equivalent to onStart(), Ljylogutil.d ("${thread.currentThread ().name}_onSubscribe start") sub = s sub?.request(1) LjyLogUtil.d("${Thread.currentThread().name}_onSubscribe end") } override fun onNext(t: Int?) { LjyLogUtil.d("${Thread.currentThread().name}_onNext:$t") sub?.request(1) } override fun onError(t: Throwable?) { LjyLogUtil.d("${Thread.currentThread().name}_onError:${t?.message}") } override fun onComplete() { LjyLogUtil.d("${Thread.currentThread().name}_onComplete") } })Copy the code
3. Single
  • Single send event (onSuccess, onError), the subscription ends upon sending;
  • Always send only one value, or an error notification, instead of sending a series of values (no back pressure problem, of course);
  • Typically an interface is only requested once and returned once, so it makes more sense to use Single with Retrofit;
Single.just(1) .map { "num_$it" } .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe { num -> LjyLogUtil.d(num) } Single.create(SingleOnSubscribe<String> { emitter -> Ljylogutil.d ("${thread.currentThread ().name}_subscribe") emitter. OnSuccess ("str1").onSuccess("str2") }). Subscribe(object: SingleObserver<String> {override fun onSubscribe(d: override fun onSubscribe). Disposable) { LjyLogUtil.d("${Thread.currentThread().name}_onSubscribe") } override fun onSuccess(t: String) { LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$t") } override fun onError(e: Throwable) { LjyLogUtil.d("${Thread.currentThread().name}_onError:${e.message}") } })Copy the code
4. Completable
  • Single send event (onError, onComplete), end subscription upon sending;
  • If your observer doesn’t even care about the onNext event, you can use Completable, which only has onComplete and onError events. To convert to other types of observers, you can also use toFlowable(), toObservable(), etc.
Completable.create { emitter -> LjyLogUtil.d("${Thread.currentThread().name}_subscribe") Emitter. OnComplete ()// onComplete or onError}. Subscribe(object: CompletableObserver {override fun onSubscribe(d: Disposable) { LjyLogUtil.d("${Thread.currentThread().name}_onSubscribe") } override fun onComplete() { LjyLogUtil.d("${Thread.currentThread().name}_onComplete") } override fun onError(e: Throwable) { LjyLogUtil.d("${Thread.currentThread().name}_onError:${e.message}") } })Copy the code
5. Maybe
  • Send a single event (onSuccess, onError, onComplete) to end the subscription upon completion. Equivalent to the combination of Completable and Single;
  • If it’s possible to send a number or not send any data, then you need Maybe, which is like a mix of Single and Completable;
  • OnSuccess and onComplete are mutually exclusive;
Maybe.create(MaybeOnSubscribe<String> {
    it.onSuccess("str1")
    it.onComplete()
}).subscribe(object : MaybeObserver<String> {
    override fun onSubscribe(d: Disposable) {
        LjyLogUtil.d("${Thread.currentThread().name}_onSubscribe")
    }

    override fun onSuccess(t: String) {
        LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$t")
    }

    override fun onError(e: Throwable) {
        LjyLogUtil.d("${Thread.currentThread().name}_onError:${e.message}")
    }

    override fun onComplete() {
        LjyLogUtil.d("${Thread.currentThread().name}_onComplete")
    }

})
Copy the code
  • The five can be observed through toObservable toFlowable, toSingle, toCompletable, toMaybe transformation;

The observer

val observer = object : Observer<Int> {
        override fun onSubscribe(d: Disposable) {
            LjyLogUtil.d("${Thread.currentThread().name}_onSubscribe")
        }

        override fun onNext(t: Int) {
            LjyLogUtil.d("${Thread.currentThread().name}_onNext:$t")
        }

        override fun onError(e: Throwable) {
            LjyLogUtil.d("${Thread.currentThread().name}_onError:${e.message}")
        }

        override fun onComplete() {
            LjyLogUtil.d("${Thread.currentThread().name}_onComplete")
        }
    }
Copy the code

Thread Schedulers

  • Specifying Observable/Observer worker threads simplifies asynchronous operations.
  • Create its own thread by default;
  • Use it with the subscribeOn and observeOn operators;
AndroidSchedulers.mainThread()
  • Need to reference RxAndroid, switch to the UI thread (Android main thread), for Android development custom;
Schedulers.io()
  • Used for IO intensive tasks, such as reading and writing SD cards, querying databases, accessing networks, etc.
  • A CacheThreadScheduler is used by default;
Schedulers.newThread()
  • Create a new thread for each task;
  • There is no thread caching mechanism. Although Schedulers. IO can be used anywhere, Schedulers.
Schedulers.computation()
  • For CPU intensive computing tasks, that is, time-consuming operations that do not limit performance by I/O operations, such as parsing XML, JSON files, compression sampling of Bitmap images, etc., it has a fixed thread pool, the size of the NUMBER of CPU cores. Cannot be used for I/O operations because the wait time for I/O operations wastes CPU.
Schedulers.trampoline()
  • The current thread executes the task immediately. If the current thread has a task executing, it will suspend it. After the inserted task is finished, the unfinished task will continue to execute.
Schedulers.single()
  • Have a thread singleton, all tasks are executed in this thread, when there are tasks executed in this thread, other tasks will be executed in a first-in, first-out order;
Scheduler.from(executor)
  • Specifies a thread scheduler that controls the execution strategy of a task.

Back pressure (Backpressure)

  • Back pressure refers to a strategy that tells the upstream observer to slow down the sending speed when the event sending speed is much faster than the processing speed of the observer in asynchronous scenarios.
  • Flowable is the observed that supports back pressure.
  • Android is rarely used, unless online video streaming, live and other scenes, when the picture has been blocked data failure, need to be abandoned, etc.;
Back pressure strategy mode
BackpressureStrategy.MISSING
  • Under this policy, Flowable created by the Create method does not specify a backpressure policy. Data transmitted through onNext is not cached or discarded, and needs to be processed downstream by the backpressure operator
BackpressureStrategy.ERROR:
  • Under this strategy, if put into the Flowable asynchronous data in a buffer pool overrun, would be thrown MissingBackpressureException exception;
BackpressureStrategy.BUFFER:
  • A buffer pool SpscLinkedArrayQueue is maintained internally, and its size is unlimited. Under this policy, if Flowable’s default asynchronous buffer pool is full, it temporarily stores data from this buffer pool. Like Observable’s asynchronous buffer pool, it can add data to it without limit. Not throw MissingBackpressureException is unusual, but can lead to OOM;
  • When the cache size is full (default cache size = 128) and the observed continues to send the next event, set the cache size to infinite. The observed can send the event observer indefinitely, but it is actually stored in the cache, but pay attention to the memory condition to prevent OOM.
BackpressureStrategy.DROP
  • Under this policy, if Flowable’s asynchronous cache pool is full, data sent upstream will be discarded.
BackpressureStrategy.LATEST
  • Similar to the Drop policy, if the cache pool is full, the data that will be added to the cache pool will be discarded. However, regardless of the status of the cache pool, LATEST will be forced into the cache pool to ensure that the observer can receive the LATEST data sent by Flowable before receiving the completion notification.
  • That is, if 150 events are sent, 129 events (1-128th + 150th) are stored in the cache.
RxJava 2.0 internally provides methods that encapsulate the back pressure policy pattern
  • The default using BackpressureStrategy. ERROR model;
onBackpressureBuffer() onBackpressureDrop() onBackpressureLatest() Flowable.interval(1, Timeunit.milliseconds) // Add the method wrapped in backpressure policy. OnBackpressureBuffer ().observeon (schedulers.newthread ()).subscribe(subscriber)Copy the code

Hot and cold stream

Cold Observable

  • Subscribe will send data;
  • The common factory methods provide ColdObservable, including just(),fromXX,create(),interval(),defer();
  • When you have multiple Subscriber, their events are independent. The sample code is as follows:
val interval = Observable.interval(1, TimeUnit.SECONDS) var disposable1: Disposable? = null val observer1 = object : Observer<Long> { override fun onSubscribe(d: Disposable) { disposable1 = d } override fun onNext(t: Long) {ljylogutil. d(" observer 1: $t")} Override fun onError(e: Throwable) { } override fun onComplete() { } } var disposable2: Disposable? = null val observer2 = object : Observer<Long> { override fun onSubscribe(d: Disposable) { disposable1 = d } override fun onNext(t: Long) {ljylogutil. d(" observer 2: $t")} Override fun onError(e: Throwable) { } override fun onComplete() { } } findViewById<Button>(R.id.btn_subscribe1) .clicks().subscribe { interval.subscribe(observer1) } findViewById<Button>(R.id.btn_dispose1) .clicks().subscribe { disposable1?.dispose() } findViewById<Button>(R.id.btn_subscribe2) .clicks().subscribe { interval.subscribe(observer2) } findViewById<Button>(R.id.btn_dispose2) .clicks().subscribe { disposable2?.dispose() }Copy the code

Hot Observable

  • For all Hot Observable subscribers, they receive the same data at the same time.
  • The publish() operator is usually used to make ColdObservable Hot. Or Subjects are also Hot Observables;
  • If he starts to transfer data and you do not call stop (Dispose ()/cancel()), he will not stop and keep transmitting data even if he has no Subscriber left.
val interval = Observable.interval(1, TimeUnit.SECONDS).publish() var disposable: Disposable? = null findViewById<Button>(R.id.btn_connect) .clicks().subscribe { disposable = interval.connect() } findViewById<Button>(R.id.btn_dispose) .clicks().subscribe { disposable?.dispose() } Subscribe {interval.subscribe {ljylogutil.d (" viewByID <Button>(r.i.btn_subscribe3).clicks().subscribe {interval.subscribe {ljylogutil.d (" viewById <Button> $it")}} findViewById<Button>(r.i.btn_subscribe4).subscribe {interval.subscribe {ljylogutil.d (" observer 2: $it") } }Copy the code

The operator

  • There are a lot of operators, do not need to recite completely, browse, when know where to find (focus + collect text) can

thread

  • SubscribeOn: specifies the worker thread of the observed Observable.
  • ObserveOn: The observer worker thread specifying the observer;
observable //subscribeOn: // Only the top end is used for the origin Observable when subscribeOn() is called multiple times. / / reason subscribeOn bottom by new observables. SubscribeOn (Schedulers. IO ()). SubscribeOn (AndroidSchedulers. MainThread ()) //observeOn specifies the thread after which the operation is to be performed. The program implements multiple thread switches through multiple calls to observeOn. //observeOn is each observer below it. Schedulers.io()). Map {ljylogutil.d ("map: ${Thread.currentThread().name}") "num_$it" } .observeOn(AndroidSchedulers.mainThread()) .subscribe(observer)Copy the code

The create operation

Basic create
create
  • Create an Observable from scratch by calling the observer method.
Observable.create(ObservableOnSubscribe<Int> {
    LjyLogUtil.d("${Thread.currentThread().name}_subscribe")
    it.onNext(1)
    it.onNext(3)
    it.onNext(5)
    it.onComplete()
})
Copy the code
Quickly create
just
  • Converts objects or collections of objects into an Observable that emits those objects.
Observable.just(1, 2, 3, 4)
Copy the code
fromArray & fromIterable
  • Converts other objects or data structures into Observables;
Observables. FromArray (arrayOf (1, 2, 3)) observables. FromIterable (listOf (4 and 6))Copy the code
never
  • The created object does not send any events, and the observer receives nothing.
Observable.never<Int>().subscribe(observer)
Copy the code
empty
  • The created object only sends a Complete event to indicate that it is completed. OnCompleted is called by the observer after receiving the completed event.
Observable.empty<Int>().subscribe(observer)
Copy the code
error
  • The created observed will only send an Error event to directly notify the exception, and the observer will call onError directly after receiving it.
Observable.error<Int>(RuntimeException()).subscribe(observer)
Copy the code
Create a delay
defer:
  • Instead of creating the Observable before the observer subscribes, create a new Observable for each observer.
var num=1
val observable=Observable.defer {Observable.just(num)}
num=2
observable.subscribe(observer)
Copy the code
timer
  • Creates an Observable that emits single data after a specified delay.
// After 3 SECONDS, send an integer 0 Observable. Timer (3, timeunit.seconds)Copy the code
interval & intervalRange
  • Create an Observable that emits a timed sequence of integers.
Observable.interval(1, 3, timeunit.seconds) // Initial delay is 2 SECONDS, then an integer starting from 10 is emitted every second. Stop Observable. IntervalRange (10, 5, 2, 1, timeUnit.seconds);Copy the code
range & rangeLong
  • Creates an Observable that emits a sequence of integers in the specified range.
Observable.range(0, 5)

Observable.rangeLong(0, 5)
Copy the code
Repeat
  • Create observables that repeatedly emit specific data or data sequences;
// Always repeat observable. fromArray(1, 2, 3, 4). Repeat () // Repeat 5 times observable. fromArray(1, 2, 3, 4) RepeatUntil (1, 2, 3, 4).repeatUntil (1, 2, 3, 4).repeatUntil {false} // Observable. RepeatWhen {it. FlatMap {obj -> if (obj is NumberFormatException) {Observable. Error ("repeatWhen terminate ")} else { Observable.just(5, 6, 7) } } }.subscribe(observer)Copy the code

2. Transformation operation:

map
  • Mapping transforms the data emitted by an Observable by applying a function to each item in the sequence. In essence, it performs a function on each item in the sequence. The parameter of the function is the data item.
Observable.just("1", "2", "3").map { it.toInt() }.subscribe(observer)
Copy the code
flatMap & concatMap
  • Flat mapping transforms the data emitted by Observables into Observables collection, and then flattens the data emitted by these Observables into a single Observable, which can be considered as a process of expanding nested data structures.
//concatMap与flatMap的区别: concatMap是有序的,flatMap是无序的
Observable.just("A", "B", "C")
    .flatMap { x ->
        Observable.intervalRange(1, 3, 0, 1, TimeUnit.SECONDS).map { y ->
            "($x,$y)"
        }
    }
Observable.just("A", "B", "C")
    .concatMap { m ->
        Observable.intervalRange(1, 3, 0, 1, TimeUnit.SECONDS).map { n ->
            "($m,$n)"
        }
    }
Copy the code
groupBy
  • Grouping: the original observables are divided into a collection of observables, and the data transmitted by the original Observables are grouped by Key. Each Observable emits a group of different data.
Observable.just(
    "Tiger",
    "Elephant",
    "Cat",
    "Chameleon",
    "Frog",
    "Fish",
    "Turtle",
    "Flamingo"
)
    .groupBy { it[0].uppercaseChar() }
    .concatMapSingle { it.toList() }
    .subscribe(observer4)
Copy the code
scan
  • Scan, which applies a function to each piece of data emitted by an Observable and emits the values in sequence.
  • The transmitted data and the data transmitted in the previous round are functionally processed, and the returned data is used in the next round, continuing this process to generate the remaining data stream. Its application scenarios include simple accumulative calculation, judging the minimum value of all data, etc.
Observable.just(1, 2, 3, 4)
    .scan { t1, t2 -> t1 + t2 }
    .subscribe(observer)
Copy the code
buffer
  • Caches, which regularly collect data from observables into a collection and then bundle the collection instead of one at a time
Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
    .buffer(3)
    .subscribe(observer5)
Copy the code
Window
  • Window, which periodically splits the data from observables into several Observable Windows and launches these Windows instead of one at a time; Similar to buffers, buffers emit data, while Windows emit observables. Each Observable emits a subset of the original Observable’s data.
// The window operator caches a certain number of data items. The window operator eventually emits a new stream of events, called an integerObservable, The buffer operator emits a new stream of data, that is, the Window operator emits new items in the event stream, which can also be processed by other Rxjava operators. Observable.just(1, 2, 3, 4) .window(2, 1) .subscribe(observer6)Copy the code

3. Filtration operation:

Filter
  • Filter, filter out the data items that do not pass the predicate test, and send only those that pass the test;
Observable.just(1, 2, 3, 4, 5)
    .filter {
            it % 2 == 0
    }.subscribe(observer)
Copy the code
ofType
  • Filtering specific types of data;
Observable. Just (1, 2.1, "3", 4L, 5.0).subscribe(observer).ofType(Int::class.java).subscribe(observer)Copy the code
distinct & distinctUntilChanged
  • Deduplication, filtering out repeated data items;
DistinctUntilChanged ()// distinctUntilChanged()// distinctUntilChanged()// distinctUntilChanged()// distinctUntilChanged(). Subscribe (observer)Copy the code
skip & skipLast
Observable = observable. Just (1, 2, 3, 4, 5, 6, 7, 8); Observable.subscribe (observer) // Skip (4).subscribe(observer)Copy the code
take & takeLast
Val Observable = observable. Just (1, 2, 3, 4, 5, 6, 7, 8) //take: Observable (4).subscribe(observer) //takeLast (4).subscribe(observer)Copy the code
debounce
  • Data is transmitted only after a period of idle time. In layman’s terms, an operation is performed if no operation has been performed for a period of time.
val observable2 = Observable.create<Int> {
    it.onNext(1)
    Thread.sleep(400)
    it.onNext(2)
    Thread.sleep(1200)
    it.onNext(3)
    Thread.sleep(1000)
    it.onNext(4)
    Thread.sleep(800)
    it.onNext(5)
    Thread.sleep(2000)
    it.onNext(6)
}
observable2
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.io())
    .debounce(1, TimeUnit.SECONDS)
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(observer)
Copy the code
sample
  • Sampling, periodically transmitting the latest data, which is equivalent to data sampling;
// Sample: The difference with Debounce is that sample is the latest data in a time cycle, second after second. IO ()). ObserveOn (schedulers.io ()). Sample (1, TimeUnit.SECONDS) .observeOn(AndroidSchedulers.mainThread()) .subscribe(observer)Copy the code
throttleFirst & throttleLast & throttleWithTimeout & throttleLatest
  • ThrottleFirst is the first data in a specified period. ThrottleLast is the same as sample. ThrottleWithTimeout is the same as debounce;
observable2
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.io())
    .throttleFirst(1, TimeUnit.SECONDS)
//  .throttleLast(1, TimeUnit.SECONDS)
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(observer)
Copy the code
timeout
  • If the last data is not transmitted within the specified time after the first element is transmitted, a timeout exception will be returned.
observable2
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.io())
    .timeout(1, TimeUnit.SECONDS)
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(observer)
Copy the code
elementAt
  • Value, takes the data item at a specific location;
Observable. Just (4, 3, 2, 1).elementat (1) //.elementatorError (1).subscribe(maybeObserver)Copy the code
first
  • First, only the first data that meets the condition is transmitted;
Observable.just(1, 2, 3, 4, 5)
//  .first(-1)
//  .firstOrError()
    .firstElement()
    .subscribe(maybeObserver)
Copy the code
last
  • The last item, only the last data is transmitted;
 Observable.just(1, 2, 3, 4, 5)
//  .last(-1)
//  .lastOrError()
    .lastElement()
    .subscribe(maybeObserver)
Copy the code
IgnoreElements
  • Ignore all data and keep only termination notifications (onError or onCompleted). IgnoreElements apply to Flowable and Observable. IgnoreElement for Maybe, Single;
Observable.just(1, 2, 3, 4, 5)
    .ignoreElements()
    .subscribe(object : CompletableObserver {
        override fun onSubscribe(d: Disposable) {
            LjyLogUtil.d("${Thread.currentThread().name}_onSubscribe")
        }

        override fun onComplete() {
            LjyLogUtil.d("${Thread.currentThread().name}_onComplete")
        }

        override fun onError(e: Throwable) {
            LjyLogUtil.d("${Thread.currentThread().name}_onError:${e.message}")
        }
    })
Copy the code

4. Combined operation:

concat & concatArray
  • Uninterleaving data from multiple Observables;
  • Multiple observers are combined to send data together, and the data is executed sequentially according to the sending sequence after merging.
val just1 = Observable.just(1, 2, 3) val just2 = Observable.just("A", "B", "C") val just3 = Observable.just(4, 5, Observable. Concat (just1, just2). Subscribe (observer3) // Concat group <=4 ConcatArray (Observable. Just (1, 2, 3), Observable. Just (4, 5, 6), Observable. 9) ).subscribe(observer3)Copy the code
merge & mergeArray
  • Multiple observers were combined to send data together, and then executed in parallel according to the timeline after merging.
  • The differences between Merge and concAT are as follows: Data items emitted after merge are in parallel disorder, while data items emitted after CONCAT are in serial order.
Observable.merge(just1, just2).subscribe(observer3)
//mergeWith
just1.mergeWith(just3).subscribe(observer3)
Copy the code
zip
  • Package, combine data emitted by multiple Observables with a specified function, and then transmit the result of this function as a single data.
Observable. Zip (just1, just2, {t1, {t1, {t1, {t1, {t1, {t1, {t1,}); t2 -> "${t1}_${t2}" }) .subscribe(observer3) //zipWith just1.zipWith(just2, { t1, t2 -> "${t1}_${t2}" }) .subscribe(observer3)Copy the code
startWith & startWithArray
  • Emits a specified data sequence or item before emitting the original Observable’s data sequence.
Observables. Just (1, 2, 3). StartWith (observables. Just (4 and 6)). StartWithArray,8,9 (7). The subscribe (observer3)Copy the code
join
  • Whenever an Observable emits a data item, the data emitted by the two Observables will be combined and emitted as long as the time window defined by the data item emitted by another Observable.
Join (just2, // Specify an expiration date for just2 {observable. timer(3, timeunit.seconds)}, // specify an expiration date for just1 {observable. timer(8, timeunit.seconds)} Timeunit.seconds)}, // specify the merge rule for just1 and just2 {t1, T2 -> "${t1}_${t2}"}). Subscribe (observer3)Copy the code
combineLatest
  • When either Observables emits a data, combine the latest data emitted by each Observable (two Observables) with a specified function and emit the result of that function.
Observable.combineLatest(just1, just2,
    { t1, t2 -> "${t1}_${t2}" }).subscribe(observer3)
Copy the code
switch
  • Transform an Observable that emits a sequence of observables into an Observable that emits the most recently emitted data one by one;
Observable.switchOnNext(ObservableSource<Observable<Int>> { }, 12)
Copy the code
collect
  • Collects data events sent by the Observable into a data structure.
Observable.just(1, 2, 3, 4, 5, 6) .collect( { ArrayList() }, BiConsumer<ArrayList<Int? >, Int> { t1, t2 -> t1.add(t2) } ).subscribe(Consumer { LjyLogUtil.d("num: $it") })Copy the code
count
  • Count the number of data emitted by an Observable and emit the result.
Observable.just(1, 2, 3)
    .count()
    .subscribe(object : SingleObserver<Long>{
        override fun onSubscribe(d: Disposable) {
            LjyLogUtil.d("${Thread.currentThread().name}_onSubscribe")
        }

        override fun onSuccess(t: Long) {
            LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$it")
        }

        override fun onError(e: Throwable) {
            LjyLogUtil.d("${Thread.currentThread().name}_onError:${e.message}")
        }

    })
Copy the code

5. Error handling:

cast
  • If a data element is converted to another type, an exception will be thrown if the transformation fails.
Observable. Just (1, 4.0, 3f, 7, 12, 4.6, 5). Cast (Int::class.java). Subscribe (observer)Copy the code
onErrorReturn
  • After calling the onError function of the data source, it will return to this function, which can handle the error, and then return the value, it will call the observer onNext() to continue the execution, and call onComplete() to end the firing of all events;
Observables. Just (1, 2, 3.2, 4) .map { it.toInt() } .onErrorReturn { if (it is NumberFormatException) { 0 } else { throw IllegalArgumentException() }  }.subscribe(observer)Copy the code
onErrorReturnItem
  • Similar to onErrorReturn, onErrorReturnItem does not handle errors and returns a value.
Observable. Just (1, 2, 3.2, 4).map {it.toint ()}.onErrorReturnitem (0).subscribe(observer)Copy the code
onErrorResumeNext & onExceptionResumeNext
  • Send a new Observable when it encounters an error.
Observable. Just (1, 2, 3.2, 4).map {it.toint ()}.onerrorresumenext {Observable. Just (5,6,7)}.subscribe(observer)Copy the code
retry
  • Retries. If an Observable fires an error notification, re-subscribe to it and expect it to terminate normally.
Observable. Just (1, 2, 3, 4).retry(3).subscribe(observer)Copy the code
retryUntil
  • When an exception occurs, the return value is false to continue execution (repeated transmission of data), true is no longer executed, but the onError method is called;
var temp = 0
Observable.just(1, 2, 3, 4)
    .map {
        temp = it
        it
    }
    .retryUntil {
        temp > 3
    }
    .subscribe(observer)
Copy the code
retryWhen
  • When it encounters an error, it passes the error to a new Observable and decides whether to re-subscribe to the original Observable & send events.
Observables. Just (1, 2, 3.2, 4) .map { it.toInt() } .retryWhen { it.flatMap { throwable -> if (throwable is NumberFormatException) { Observable. Error (Throwable("retryWhen terminating "))} else {observable. just(5, 6, 7)}}}. Subscribe (observer)Copy the code

6. Auxiliary operation:

delay
  • Delay transmitting result data for a period of time;
Observable.just(1, 2, 3)
    .delay(1,TimeUnit.SECONDS)
    .subscribe(observer)
Copy the code
do
  • Called during the life cycle of an event;
  • DoOnEach: calls the data source (Observable) every time it sends data.
  • DoOnNext: This method is called back each time the data source calls onNext();
  • DoOnError: This method is called back each time onError() is called by the data source;
  • DoOnComplete: This method is called back each time the data source calls onComplete();
  • DoOnSubscribe: this method is called back each time the data source calls onSubscribe();
  • DoOnDispose: The method is called back by the data source after each call of Dispose ();
Observable.just(1, 2, 3, 4)
    .observeOn(Schedulers.io())
    .subscribeOn(AndroidSchedulers.mainThread())
    .doOnSubscribe { LjyLogUtil.d("${Thread.currentThread().name}_doOnSubscribe") }
    .doOnEach { LjyLogUtil.d("${Thread.currentThread().name}_doOnEach:$it") }
    .doOnNext { LjyLogUtil.d("${Thread.currentThread().name}_doOnNext:$it") }
    .doOnError { LjyLogUtil.d("${Thread.currentThread().name}_doOnError:${it.localizedMessage}") }
    .doOnComplete { LjyLogUtil.d("${Thread.currentThread().name}_doOnComplete") }
    .doOnDispose { LjyLogUtil.d("${Thread.currentThread().name}_doOnDispose") }
    .subscribe(observer)
Copy the code

7. Conditional and Boolean operations:

all
  • Determine whether all data items emitted by Observable meet certain conditions;
Observable.just(1, -2, 3)
    .all {
        it > 0
    }.subscribe { it ->
        LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$it")
    }
Copy the code
takeWhile & skipWhile & takeUntil & skipUntil
Observable.interval(1, timeunit.seconds) // If the condition is met, TakeWhile {it<10} observable. interval(1, timeunit.seconds) Skip data. SkipWhile {it<10} observable. interval(1, timeunit.seconds) TakeUntil {it>10} Observable.interval(1, 1) Timeunit.seconds) // Wait until the incoming Observable starts sending data. SkipUntil (Observable.timer(5, timeunit.seconds))Copy the code
SequenceEqual:
  • Check whether two Observables follow the same data sequence.
Observable.sequenceEqual(
    Observable.just(4, 5, 6),
    Observable.just(4, 5, 6)
).subscribe { it ->
    LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$it")
}
Copy the code
contains
  • Checks whether an Observable emits a specified data item.
Observable.just(1, -2, 3)
    .contains(2)
    .subscribe { it ->
        LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$it")
    }
Copy the code
isEmpty
  • Determine whether the data sent is empty;
Observable.just(1, -2, 3)
    .isEmpty()
    .subscribe { it ->
        LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$it")
    }
Copy the code
amb

– Given multiple Observables, only let the first one emit data emit all data;

val list: MutableList<ObservableSource<Int>> = ArrayList()
list.add(Observable.just(1, 2, 3).delay(1, TimeUnit.SECONDS))
list.add(Observable.just(4, 5, 6))
Observable.amb(list).subscribe{
    LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$it")
}
Copy the code
defaultIfEmpty
  • Emits data from the original Observable. If the original Observable does not emit data, it emits a default data.
// Without sending the onNext event, Just send the onComplete event Observable. Empty <Int>().subscribe (-1).subscribe {it -> LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$it") }Copy the code
  • SkipUntil: discards the data emitted by the original Observable until the second Observable emits a data, and then emits the remaining data of the original Observable;

In actual combat

Used in conjunction with RxBinding

  • RxBinding is an extension to Android View events that allows developers to use various RxJava operations on View events.
1. Add dependencies
/ / RxBinding implementation 'com. Jakewharton. Rxbinding4: RxBinding: 4.0.0' / / Google 'material' library bindings: Implementation 'com. Jakewharton. Rxbinding4: rxbinding - material: 4.0.0' / / AndroidX library bindings: Implementation 'com. Jakewharton. Rxbinding4: rxbinding - core: 4.0.0' implementation 'com. Jakewharton. Rxbinding4: rxbinding - appcompat: 4.0.0' implementation 'com. Jakewharton. Rxbinding4: rxbinding - drawerlayout: 4.0.0' implementation 'com. Jakewharton. Rxbinding4: rxbinding - leanback: 4.0.0' implementation 'com. Jakewharton. Rxbinding4: rxbinding - recyclerview: 4.0.0' implementation 'com. Jakewharton. Rxbinding4: rxbinding - slidingpanelayout: 4.0.0' implementation 'com. Jakewharton. Rxbinding4: rxbinding - swiperefreshlayout: 4.0.0' implementation 'com. Jakewharton. Rxbinding4: rxbinding - viewpager: 4.0.0' implementation 'com. Jakewharton. Rxbinding4: rxbinding - viewpager2:4.0.0'Copy the code
2. Button anti-shake
val btn1 = findViewById<Button>(R.id.btn_1) btn1.clicks() .throttleFirst(2, TimeUnit. SECONDS). SubscribeOn (AndroidSchedulers mainThread ()). The subscribe {LjyLogUtil. D (" click on the button ")}Copy the code
3. EditText Input listener
Val et1 = findViewById<EditText>(r.i.ett_1) et1.textChanges().debounce(1, TimeUnit. SECONDS) / / skip the first request Because of the initial input box empty character state. The skip (1). ObserveOn (AndroidSchedulers. MainThread ()). The subscribe { LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$it") }Copy the code
4. Union/form judgment
val etName = findViewById<EditText>(R.id.et_name) val etPwd = findViewById<EditText>(R.id.et_pwd) val obName = etName.textChanges() val obPwd = etPwd.textChanges() Observable.combineLatest( obName, obPwd, { name, The PWD - > name = = "ljy" && PWD = = "123"}) / / skip the first request Because the initial input box empty character state. The skip (1) observeOn (AndroidSchedulers. MainThread ()) .subscribe {isLogin -> ljylogutil. d(if (isLogin) "login successful" else "login failed ")}Copy the code
5. Timer tasks
val time = 10L val btnLogin = findViewById<Button>(R.id.btn_login) btnLogin.clicks() .throttleFirst(time, TimeUnit.SECONDS) .subscribeOn(AndroidSchedulers.mainThread()) .doOnNext { btnLogin.isEnabled = false } .subscribe { Observable. IntervalRange (0, time, 0, 1, timeUnit.seconds, AndroidSchedulers. MainThread ()). The subscribe ({btnLogin. Text = "remaining ${it} time - seconds"}, {LjyLogUtil. E (it. The message)}, {btnlogin. text = "Obtain verification code" btnlogin. isEnabled = true})}Copy the code

Resolve memory leaks with RxLifecycle

1. Add dependencies
/ / RxLifecycle implementation 'com. Trello. Rxlifecycle4: RxLifecycle: 4.0.2' / / If you want to bind to the Android - specific Lifecycles implementation 'com. Trello. Rxlifecycle4: rxlifecycle - android: 4.0.2' / / If you want the pre - written Activities and Fragments could you can ttf_subclass as will implementation 'com. Trello. Rxlifecycle4: rxlifecycle - components: 4.0.2' / / If you want pre-written support preference Fragments you can subclass as providers implementation 'com. Trello. Rxlifecycle4: rxlifecycle - components - preference: 4.0.2' / / If you want to use the Android Lifecycle for will Implementation 'com. Trello. Rxlifecycle4: rxlifecycle - android - lifecycle: 4.0.2' / / If you want to use Kotlin syntax Implementation 'com. Trello. Rxlifecycle4: rxlifecycle - kotlin: 4.0.2' / / If you want to use kotlin syntax with Android Lifecycle implementation 'com. Trello. Rxlifecycle4: rxlifecycle - android - Lifecycle - kotlin: 4.0.2'Copy the code
2. The binding
Observable.intervalRange(0, 60, 0, 1, Timeunit.seconds) // Use RxLifecycle to solve memory leaks // automatic unsubscribe example of bindToLifecycle //.compose(bindToLifecycle()) // Manually set in the activity Compose (this.bindUntilevent (activityevent.pause)).subscribe({ljylogutil. d(" remaining ${60 - it} seconds ")}, {ljylogutil.e (it. Message)}, {ljylogutil.d (" done ")})Copy the code

Network request

1. Network request nested callback
SearchRepo (emptyMap()) // Request a search list. ObserveOn (AndroidSchedulers. MainThread ()). DoOnNext} {/ / display list. The observeOn (Schedulers. IO ()). FlatMap { Observable. FromIterable (it.items).map {repo -> rebo.id}}.flatMap {apiservice.getitem (it)} ObserveOn (AndroidSchedulers. MainThread ()). The subscribe {/ / show details}Copy the code
2. Network request polling
// Every 60 seconds, Observable.interval(60, TimeUnit.SECONDS) .doOnNext { apiService.searchRepo(emptyMap()) .subscribe(Consumer { LjyLogUtil.d("accept: ${it.items}")})}.subscribe {ljylogutil. d(" $it ")} .repeatWhen {it. FlatMap {if (count > 3) {Observable. Error (Throwable(" end "))} else {ljylogutil.d (" end ")).repeatWhen {it. Observable.just(1).delay(60, TimeUnit.SECONDS) } } } .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe({ LjyLogUtil.d("accept: ${it.items}") count++ }, { LjyLogUtil.d(it.message) })Copy the code
3. The network request fails and is reconnected
Var currentRetryCount = 0 apiService. SearchRepo (emptyMap()).retryWhen {// maxConnectCount = 10 // Number of retries var currentRetryCount = 0 apiService. SearchRepo (emptyMap()). FlatMap {throwable -> // Retry if (throwable is IOException) {if (currentRetryCount < maxConnectCount) { CurrentRetryCount++ // the more exceptions encountered, Observable. Just (1). Delay (currentRetryCount * 60L, Timeunit.seconds)} else {Observable. Error (Throwable(" End of retry ")}} else { Observable. Error (Throwable(" non-network exception occurred "))}}}. .observeOn(AndroidSchedulers.mainThread()) .subscribe({ LjyLogUtil.d("accept: ${it.items}") }, { LjyLogUtil.d(it.message) })Copy the code
4. Memory, disk, network level 3 cache
var memoryCache: String? = null var diskCache: String? = null private fun test10() { val memory = Observable.create<String> { if (memoryCache ! = null) { it.onNext(memoryCache) } else { it.onComplete() } } val disk = Observable.create<String> { if (diskCache ! = null) {it.onNext(diskCache)} else {it.oncomplete ()}}.doonNext {memoryCache = "memory data"} val net = Observable. Just (" network data ").doonNext {// memoryCache=" memory data "diskCache =" disk data"} // Concat () merges memory, disk, and network Observable. Concat (memory, disk, net) // Via firstElement(), fetches the first valid event from the concatenated queue and sends the first valid event. FirstElement ().subscribe {ljylogutil. d(" Accept: $it")}}Copy the code
5. Merge data sources & simultaneous presentation
Var result =" "Val net = Observable. Just (" network ") val disk = Observable. Just (" disk ") Merge (net, disk). Subscribe ({result += "$it, "}, {}, {ljylogutil. d("result: $result")}) // Use zip to get data and show val repo1 = apiService. GetItem (1001).subscribeon (schedulers.io ()) val repo2 = apiService.getItem(1002).subscribeOn(Schedulers.io()) Observable.zip( repo1, repo2, { data1, data2 -> val repoList = ArrayList<RepoDetail>() repoList.add(data1) repoList.add(data2) repoList }) .observeOn(AndroidSchedulers.mainThread()) .subscribe { for (repoDetail in it) { LjyLogUtil.d("result: ${repoDetail.name}") } }Copy the code

The source code parsing

The entrance

  • With the following code as the source reading entry, Single is the simplest observed;
Single.just(1)
    .subscribe(object : SingleObserver<String> {
        override fun onSubscribe(d: Disposable) {
            LjyLogUtil.d("${Thread.currentThread().name}_onSubscribe")
        }

        override fun onSuccess(t: String) {
            LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$t")
        }

        override fun onError(e: Throwable) {
            LjyLogUtil.d("${Thread.currentThread().name}_onError:${e.message}")
        }
    })
Copy the code

Creation of the observed

  • Let’s look at the creation method of the observed in the code above: single.just ();
public static <@NonNull T> Single<T> just(T item) { Objects.requireNonNull(item, "item is null"); // Void return rxJavaplugins. onAssembly(new SingleJust<>(item)); }Copy the code
  • Rxjavaplugins.onassembly is a hook method to add some additional operations, as shown below:
@Nullable static volatile Function<? super Single, ? extends Single> onSingleAssembly; public static <T> Single<T> onAssembly(@NonNull Single<T> source) { Function<? super Single, ? extends Single> f = onSingleAssembly; if (f ! = null) { return apply(f, source); } return source; }Copy the code
  • OnSingleAssembly is empty by default, so the actual default is to return source directly, which is SingleJust;
  • SingleJust creates an instance of the observed implementation SingleJust and returns it. Let’s look at the code for SingleJust.
public final class SingleJust<T> extends Single<T> { final T value; public SingleJust(T value) { this.value = value; } @Override protected void subscribeActual(SingleObserver<? super T> observer) { observer.onSubscribe(Disposable.disposed()); observer.onSuccess(value); }}Copy the code
  • Very simple, inherit Single, rewrite Single’s only abstract method subscribeActual, which has a generic variable value;
  • Single code is a lot of, basically is the implementation of the operator method introduced above, when you can point into the source code to have a look;
public abstract class Single<@NonNull T> implements SingleSource<T> { ... protected abstract void subscribeActual(@NonNull SingleObserver<? super T> observer); . public final void subscribe(@NonNull SingleObserver<? super T> observer) { ... }}Copy the code
  • Single implements the SingleSource interface and subscribe method;
public interface SingleSource<@NonNull T> {

    /**
     * Subscribes the given {@link SingleObserver} to this {@link SingleSource} instance.
     * @param observer the {@code SingleObserver}, not {@code null}
     * @throws NullPointerException if {@code observer} is {@code null}
     */
    void subscribe(@NonNull SingleObserver<? super T> observer);
}
Copy the code

To subscribe to

  • Let’s look at the subscribe method in line 2 of the first example: single.subscribe ();
public final void subscribe(@NonNull SingleObserver<? {super T> observer) {// requireNonNull(observer, "observer is null"); / / hook method, the default or in ginseng SingleObserver observer = RxJavaPlugins. OnSubscribe (this, the observer). / / to null Objects. RequireNonNull (the observer, "The RxJavaPlugins.onSubscribe hook returned a null SingleObserver. Please check the handler provided to RxJavaPlugins.setOnSingleSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins"); try { subscribeActual(observer); } catch (NullPointerException ex) { throw ex; } catch (Throwable ex) { Exceptions.throwIfFatal(ex); NullPointerException npe = new NullPointerException("subscribeActual failed"); npe.initCause(ex); throw npe; }}Copy the code
  • You can see that the most useful line in the subscribe method is to call subscribeActual, and the implementation of subscribeActual is the implementation of SingleJust above;
@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
    observer.onSubscribe(Disposable.disposed());
    observer.onSuccess(value);
}
Copy the code
  • You can see that subscribeActual in SingleJust directly calls the onSubscribe and onSuccess methods of the Observer. And the entry into onSubscribe becomes a disposed Disposable, which is a disposed Disposable, because just is delabinated, doesn’t need to be disposed or modified, and onError can’t be called, because sending a prompt prompt can’t prompt an error;

Operator implementation

map

  • Take map as an example. Extend the previous example by adding a map operator to convert data. Map understands the data, and the main idea of the other operators is the same.
Single.just(1)
    .map { "num_$it" }
    .subscribe { num ->
        LjyLogUtil.d(num)
    }
Copy the code
  • So look at the map implementation:
public final <@NonNull R> Single<R> map(@NonNull Function<? super T, ? extends R> mapper) {
    Objects.requireNonNull(mapper, "mapper is null");
    return RxJavaPlugins.onAssembly(new SingleMap<>(this, mapper));
}
Copy the code
  • Create a SingleMap and return it. The first argument is this, which is the SingleJust instance returned by just, and the second argument is the data conversion converter.
  • So let’s look at SingleMap:
public final class SingleMap<T, R> extends Single<R> { final SingleSource<? extends T> source; final Function<? super T, ? extends R> mapper; public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) { this.source = source; this.mapper = mapper; } @Override protected void subscribeActual(final SingleObserver<? super R> t) { source.subscribe(new MapSingleObserver<T, R>(t, mapper)); }... // omit the inner class MapSingleObserver}Copy the code
  • You can see that there is only one line of code in subscribeActual of SingleMap, which is to call the subscribe method of source and pass in an observer instance MapSingleObserver and the data converter held by SingleMap. Source is the observed SingleJust instance of the previous step;
  • Subscribe to the SingleObserver instance, subscribe to the SingleObserver instance.
static final class MapSingleObserver<T, R> implements SingleObserver<T> { final SingleObserver<? super R> t; final Function<? super T, ? extends R> mapper; MapSingleObserver(SingleObserver<? super R> t, Function<? super T, ? extends R> mapper) { this.t = t; this.mapper = mapper; } @Override public void onSubscribe(Disposable d) { t.onSubscribe(d); } @Override public void onSuccess(T value) { R v; V = object.requirenonnull (mapper.apply(value), "The mapper function returned a null value."); } catch (Throwable e) { Exceptions.throwIfFatal(e); onError(e); return; } t.onSuccess(v); } @Override public void onError(Throwable e) { t.onError(e); }}Copy the code
  • MapSingleObserver onSuccess calls the apply method of the data converter Mapper to convert the data.

The onSubscribe and onError methods call the SingleObserver methods directly and pass the converted data to the SingleObserver onSuccess constructor

summary
  • So the whole process before and after map in the sample code is strung together:
  1. Single.just creates an observed SingleJust that sends raw data.
  2. The map creates an observed SingleMap, and the constructor takes SingleJust and the data converter from the previous step, so it wraps, or takes over, SingleJust. SingleMap calls SingleJust’s subscribe method and passes it to MapSingleObserver, which says to SingleJust, “J, you’ve given this project to SingleMap. You don’t need to follow the following clients.” Just give the data in your hand to my little brother MapSingleObserver, and he will ask you for anything he needs in the future.
  3. Subscribe, subscribe to SingleMap, subscribe to SingleMap, subscribe to SingleMap, subscribe to SingleMap
  4. SingleMap received the order and handed it over to MapSingleObserver. Anyway, you also have the data of SingleJust in your hand
  5. MapSingleObserver takes the data from SingleJust, transforms it, and then calls the callback method of the client (the observer downstream subscribed to) to serve it
  • Ps:
    • MapSingleObserver: I am also too tired to be an observer. I am also responsible for receiving upstream data and processing it. After processing it, I have to feedback it to downstream customers.
    • SingleJust: Are you tired? Besides, SingleMap took away the project I worked so hard for and let you monitor (observe) me, which was digested internally. Moreover, when the project started, I gave my work number to the client (T. subscribe (D)). Then the client said to stop. I made a mistake here and you just turn around and tell the client (t.onError(e));
    • This is not RxJava, this is clearly the workplace ah;

subscribeOn & observeOn

  • To extend the previous example, add a thread switch, which uses the subscribeOn & observeOn operator;
Single.just(1)
    .map { "num_$it" }
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe { num ->
        LjyLogUtil.d(num)
    }
Copy the code
subscribeOn
  • SubscribeOn schedulers.io ()
public static Scheduler io() { return RxJavaPlugins.onIoScheduler(IO); } static { SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask()); COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask()); IO = RxJavaPlugins.initIoScheduler(new IOTask()); TRAMPOLINE = TrampolineScheduler.instance(); NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask()); } static final class IOTask implements Supplier<Scheduler> { @Override public Scheduler get() { return IoHolder.DEFAULT;  } } static final class IoHolder { static final Scheduler DEFAULT = new IoScheduler(); } public final class IoScheduler extends Scheduler { ... }Copy the code
  • IoScheduler, the implementation class of Scheduler, is returned.
  • Now look at the implementation of the subscribeOn method:
public final Single<T> subscribeOn(@NonNull Scheduler scheduler) {
    Objects.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new SingleSubscribeOn<>(this, scheduler));
}
Copy the code
  • Once again nullates and hooks, a SingleSubscribeOn is created and returned with the upstream observed (this) and thread Scheduler Scheduler;
public final class SingleSubscribeOn<T> extends Single<T> { final SingleSource<? extends T> source; final Scheduler scheduler; public SingleSubscribeOn(SingleSource<? extends T> source, Scheduler scheduler) { this.source = source; this.scheduler = scheduler; } @Override protected void subscribeActual(final SingleObserver<? Super T> observer) {// Create an observer SubscribeOnObserver instance, Source final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer, source); OnSubscribe (parent); // Call onSubscribe(observer) to notify the downstream observer that it is SubscribeOnObserver observer.onsubscribe (parent); // Call IoScheduler's scheduleDirect method to switch the thread of Disposable f = scheduler.scheduleDirect(parent); parent.task.replace(f); }... // omit the inner class SubscribeOnObserver}Copy the code
  • A SubscribeOnObserver is created in the above subscribeActual, and scheduler. ScheduleDirect is called to switch the thread.
public Disposable scheduleDirect(@NonNull Runnable run) { return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS); } public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { final Worker w = createWorker(); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); DisposeTask task = new DisposeTask(decoratedRun, w); w.schedule(task, delay, unit); return task; Static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection { ... DisposeTask(@NonNull Runnable decoratedRun, @NonNull Worker w) { this.decoratedRun = decoratedRun; this.w = w; } @Override public void run() { runner = Thread.currentThread(); try { try { decoratedRun.run(); } catch (Throwable ex) { // Exceptions.throwIfFatal(e); nowhere to go RxJavaPlugins.onError(ex); throw ex; } } finally { dispose(); runner = null; }}... }Copy the code
  • ScheduleDirect’s input parameter is a Runnable, and SubscribeOnObserver implements the Runnable interface and SingleObserver interface. Constructor input parameters are downstream observer and upstream observed. In its own onSuccess,onError directly to the downstream observer, in the run method to subscribe to the upstream observer;
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements SingleObserver<T>, Disposable, Runnable { ... SubscribeOnObserver(SingleObserver<? super T> actual, SingleSource<? extends T> source) { this.downstream = actual; this.source = source; this.task = new SequentialDisposable(); }... @Override public void onSuccess(T value) { downstream.onSuccess(value); } @Override public void onError(Throwable e) { downstream.onError(e); }... @Override public void run() { source.subscribe(this); }}Copy the code
observeOn
  • Let’s look at the Android dedicated thread scheduler AndroidSchedulers. MainThread () :
Public final Class AndroidSchedulers {public static Scheduler mainThread() {public static Scheduler mainThread() { . / / hook return RxAndroidPlugins onMainThreadScheduler (MAIN_THREAD); } / / private static variables private static final Scheduler MAIN_THREAD = RxAndroidPlugins. InitMainThreadScheduler (() - > MainHolder.DEFAULT); Private static final class MainHolder {// This is a private static final class with a Handler. Looper.getmainlooper () switches to the UI thread static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()), true); } // private constructor and throws an exception, since this is a singleton that creates the HandlerScheduler, Private AndroidSchedulers() {throw new AssertionError("No instances."); } } final class HandlerScheduler extends Scheduler { private final Handler handler; private final boolean async; HandlerScheduler(Handler handler, boolean async) { this.handler = handler; this.async = async; } public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) { ... ScheduledRunnable scheduled = new ScheduledRunnable(handler, run); Message message = Message.obtain(handler, scheduled); if (async) { message.setAsynchronous(true); } / / by the main thread of the handler sends the message handler. SendMessageDelayed (message, unit.and toMillis (delay)); return scheduled; } @Override public Worker createWorker() { return new HandlerWorker(handler, async); } private static final class HandlerWorker extends Worker { private final Handler handler; private final boolean async; private volatile boolean disposed; HandlerWorker(Handler handler, boolean async) { this.handler = handler; this.async = async; } public Disposable schedule(Runnable run, long delay, TimeUnit unit) { ... if (disposed) { return Disposable.disposed(); }... ScheduledRunnable scheduled = new ScheduledRunnable(handler, run); Message message = Message.obtain(handler, scheduled); message.obj = this; . handler.sendMessageDelayed(message, unit.toMillis(delay)); if (disposed) { handler.removeCallbacks(scheduled); return Disposable.disposed(); } return scheduled; } @Override public void dispose() { disposed = true; handler.removeCallbacksAndMessages(this /* token */); } @Override public boolean isDisposed() { return disposed; } } private static final class ScheduledRunnable implements Runnable, Disposable { private final Handler handler; private final Runnable delegate; private volatile boolean disposed; ScheduledRunnable(Handler handler, Runnable delegate) { this.handler = handler; this.delegate = delegate; } @Override public void run() { try { delegate.run(); } catch (Throwable t) { RxJavaPlugins.onError(t); } } @Override public void dispose() { handler.removeCallbacks(this); disposed = true; } @Override public boolean isDisposed() { return disposed; }}}Copy the code
  • Then look at the observeOn method:
public final Single<T> observeOn(@NonNull Scheduler scheduler) { Objects.requireNonNull(scheduler, "scheduler is null");  return RxJavaPlugins.onAssembly(new SingleObserveOn<>(this, scheduler)); }Copy the code
  • Essentially creating an observed:
public final class SingleObserveOn<T> extends Single<T> { final SingleSource<T> source; final Scheduler scheduler; public SingleObserveOn(SingleSource<T> source, Scheduler scheduler) { this.source = source; this.scheduler = scheduler; } @Override protected void subscribeActual(final SingleObserver<? Super T> observer) {// Call the upstream source subscribe method, passing in an observer ObserveOnSingleObserver, Source. Subscribe (new ObserveOnSingleObserver<>(observer, scheduler)); }... // omit inner class ObserveOnSingleObserver}Copy the code
  • The ObserveOnSingleObserver code reads as follows, calling onSubscribe of the downstream observer on its own onSubscribe, and switching threads on scheduler. ScheduleDirect on its own onSuccess and onError. The scheduleDirect parameter is Runnable, which calls its own run method. In the run method, onSuccess and onError of the downstream observer are called, so as to change the thread of the downstream observer.
static final class ObserveOnSingleObserver<T> extends AtomicReference<Disposable> implements SingleObserver<T>, Disposable, Runnable { ... ObserveOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler) { this.downstream = actual; this.scheduler = scheduler; } @Override public void onSubscribe(Disposable d) { if (DisposableHelper.setOnce(this, d)) { downstream.onSubscribe(this); } } @Override public void onSuccess(T value) { this.value = value; Disposable d = scheduler.scheduleDirect(this); DisposableHelper.replace(this, d); } @Override public void onError(Throwable e) { this.error = e; Disposable d = scheduler.scheduleDirect(this); DisposableHelper.replace(this, d); } @Override public void run() { Throwable ex = error; if (ex ! = null) { downstream.onError(ex); } else { downstream.onSuccess(value); }}... }Copy the code

Present: Single. Create

  • Single. Create: Single. Create: Single.
public static <@NonNull T> Single<T> create(@NonNull SingleOnSubscribe<T> source) {
    Objects.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new SingleCreate<>(source));
}
Copy the code
  • The same create method creates an observed SingleCreate and returns:
public final class SingleCreate<T> extends Single<T> { final SingleOnSubscribe<T> source; public SingleCreate(SingleOnSubscribe<T> source) { this.source = source; } @Override protected void subscribeActual(SingleObserver<? super T> observer) { Emitter<T> parent = new Emitter<>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); }}... // Omit the inner class Emitter}Copy the code
  • Just {} assigns value directly to create, and does not subscribe downstream. Create passes in an instance of SingleOnSubscribe via an anonymous inner class and generates data in the subscribe method implemented, In SingleCreate. Call the subscribeActual SingleOnSubscribe. Subscribe, SingleCreate. SubscribeActual in downstream will perform when the subscribe, That is, single.create does not start generating data until downstream subscriptions are made; So obviously, observable.just () is not good for encapsulating network data, because we usually don’t want to do network requests before we subscribe.

reference

  • The source code of Rxjava
  • RxJava for Android developers
  • Throw line. HenCoder -RxJava principle fully resolved
  • Carson takes you through Android: RxJava
  • Embracing RxJava (3) : Hot and cold Observable, common encapsulation methods and misunderstandings
  • Rxjava3 document-level tutorial

My name is Jinyang. If you want to learn more about jinyang, please pay attention to the wechat public number “Jinyang said” to receive my latest articles