RxJava basics
RxJava create operator
Note: Dependencies need to be added before using Rxjava
dependencies {
implementation 'the IO. Reactivex. Rxjava2: rxandroid: 2.0.2'
implementation 'the IO. Reactivex. Rxjava2: rxjava: 2.2.0'
implementation 'the IO. Reactivex. Rxjava2: rxkotlin: 2.3.0'// Note: RxJava2 and RxJava1 cannot coexist, i.e. dependencies cannot exist at the same time}Copy the code
The operator | role |
---|---|
create() | Create an Observable from scratch using a function |
just() | Converts one or more objects into an Observable that emits this or these objects |
from() | Converts an Iterable, Future, or array into an Observable |
defer() | An Observable is created only when a subscriber subscribes, and a new Observable is created for each subscription |
range() | Creates an Observable that emits a sequence of integers in the specified range |
interval() | Creates an Observable that emits integer sequences at the given interval |
timer() | Create an Observable that emits single data after a given delay |
empty() | Create an Observable that does nothing and does nothing directly |
error() | Create an Observable that does nothing and notifies errors directly |
never() | Create an Observable that emits no data |
Create one.
Create an Observable from scratch using a function
Official document: reactivex. IO /documentati…
RxJava recommends that when passing the create method function we first check the isDisposed state of the observer to stop our Observable from emitting data and running expensive operations when no observer is available
- The practical application
fun testCreate() {observable. create<Int> {// Determine the isDisposed state of an Observableif(! it.isDisposed) { it.onNext(1) it.onNext(2) it.onNext(3) it.onComplete() } }.subscribeBy( onNext = { Log.e("TAG"."onNext: ${it.toString()}") },
onComplete = { Log.e("TAG"."onComplete")})}Copy the code
The execution result
onNext: 1
onNext: 2
onNext: 3
onComplete
Copy the code
2. Just
Creates an Observable that emits the specified value
Official document: reactivex. IO /documentati…
Just takes one to ten arguments and returns an Observable that emits the data in list order
- The practical application
fun testJust(){Observable. Just (1,2,3,4,5,6,7,8,9,10)."TAG"."onNext: ${it.toString()}") },
onComplete = { Log.e("TAG"."onComplete")})}Copy the code
The execution result
onNext: 1
onNext: 2
onNext: 3
onNext: 4
onNext: 5
onNext: 6
onNext: 7
onNext: 8
onNext: 9
onNext: 10
onComplete
Copy the code
If null is passed in just(), a null-pointer exception is thrown
Three.
Transform other kinds of objects and data types into Observables
- The practical application
fun testFrom(){val list = arrayListOf<Int>(1,2,3,4,5) list.toobservable ()."TAG"."onNext: ${it.toString()}") },
onComplete = { Log.e("TAG"."onComplete")})}Copy the code
The execution result
onNext: 1
onNext: 2
onNext: 3
onNext: 4
onNext: 5
onComplete
Copy the code
For the Future, it emits the single data returned by the future.get () method
class MyCallable : Callable<String>{
override fun call(): String {
Log.e("TAG"."Simulate some time-consuming operations...")
Thread.sleep(5000)
return "OK"
}
}
fun testFromFuture(){
val executorService = Executors.newSingleThreadExecutor();
val future = executorService.submit(MyCallable())
Observable.fromFuture(future)
.subscribeBy(
onNext = {Log.e("TAG"."onNext: $it")})}Copy the code
The execution result
Simulate some time-consuming operations... onNext: OKCopy the code
The FROM method has a version that accepts two optional parameters, specifying the timeout length and the unit of time. If the Future doesn’t return after the specified time, the Observable emits an error notification and terminates
fun testFromFuture(){
val executorService = Executors.newSingleThreadExecutor();
val future = executorService.submit(MyCallable())
Observable.fromFuture(future,3,TimeUnit.SECONDS)
.subscribeBy(
onNext = {Log.e("TAG"."onNext: $it")})}Copy the code
The execution result
Simulate some time-consuming operations... AndroidRuntime: FATAL EXCEPTION: main io.reactivex.exceptions.OnErrorNotImplementedExceptionCopy the code
4. Repeat
Create an Observable that emits a particular data multiple times
Instead of creating an Observable, repeat repeats the data sequence of the original Observable, which can be infinite or can be specified by repeat(n)
fun testRepeat(){
Observable.just("hello"//.repeat() // infinite.repeat(3) // loop 3 times.subscribeby (onNext = {log.e ("TAG"."onNext: ${it.toString()}") },
onComplete = { Log.e("TAG"."onComplete") },
onError = { it.printStackTrace() }
)
}
Copy the code
The execution result
onNext: hello
hello
onComplete
Copy the code
There are also two repeat related operators in rxJavA2.x: repeatWhen and repeatUntil
1. repeatWhen
RepeatWhen does not cache and repeat the data sequence of the original Observable, but re-subscribes and publishes the original Observable according to the specified conditions
fun testRepeatWhen(){observable.range (0,5).repeatwhen {observable.timer (10, timeunit.seconds)}."TAG"."onNext: ${it.toString()}") },
onComplete = { Log.e("TAG"."onComplete") },
onError = { it.printStackTrace() }
)
}
Copy the code
The execution result
09-06 05:55:14. 901, 22472-22472 / com mufeng. Rxjavademo E/TAG: onNext: 0 onNext: 1 onNext: 2 onNext: 3 onNext: 4 09-06 05:55:24. 903, 22472-22505 / com mufeng. Rxjavademo E/TAG: onNext: 0 09-06 05:55:24. 904, 22472-22505 / com mufeng. Rxjavademo E/TAG: onNext: 1 onNext: 2 onNext: 3 onNext: 4 09-06 05:55:24. 911, 22472-22505 / com mufeng. Rxjavademo E/TAG: the onCompleteCopy the code
2. repeatUntil
RepeatUntil is a new operator in rxJavA2.x, indicating that data is not repeated until a certain condition
fun testRepeatUntil(){
val time = System.currentTimeMillis();
Observable.just("hello")
.repeatUntil {
System.currentTimeMillis() - time > 5000
}
.subscribeBy(
onNext = { Log.e("TAG"."onNext: ${it.toString()}") },
onComplete = { Log.e("TAG"."onComplete") },
onError = { it.printStackTrace() }
)
}
Copy the code
The execution result
09-06 06:02:15. 220, 22728-22728 / com mufeng. Rxjavademo E/TAG: onNext: Hello 09-06 06:02:16. 552, 22728-22728 / com. Mufeng. Rxjavademo E/TAG: onNext: Hello 09-06 06:02:16. 552, 22728-22728 / com. Mufeng. Rxjavademo E/TAG: onNext: Hello 09-06 06:02:16. 568, 22728-22728 / com. Mufeng. Rxjavademo E/TAG: onNext: Hello 09-06 06:02:16. 569, 22728-22728 / com. Mufeng. Rxjavademo E/TAG: onNext: Hello 09-06 06:02:16. 578, 22728-22728 / com. Mufeng. Rxjavademo E/TAG: onNext: Hello 09-06 06:02:16. 579, 22728-22728 / com. Mufeng. Rxjavademo E/TAG: onNext: Hello 09-06 06:02:16. 579, 22728-22728 / com. Mufeng. Rxjavademo E/TAG: onNext: Hello 09-06 06:02:16. 579, 22728-22728 / com. Mufeng. Rxjavademo E/TAG: onNext: Hello 09-06 06:02:16. 580, 22728-22728 / com. Mufeng. Rxjavademo E/TAG: onNext: Hello 09-06 06:02:16. 580, 22728-22728 / com. Mufeng. Rxjavademo E/TAG: onNext: Hello 09-06 06:02:16. 581, 22728-22728 / com. Mufeng. Rxjavademo E/TAG: onNext: Hello 09-06 06:02:16. 581, 22728-22728 / com. Mufeng. Rxjavademo E/TAG: onNext: Hello 09-06 06:02:16. 583, 22728-22728 / com. Mufeng. Rxjavademo E/TAG: onNext: Hello 09-06 06:02:16. 584, 22728-22728 / com. Mufeng. Rxjavademo E/TAG: onNext: Hello 09-06 06:02:16. 609, 22728-22728 / com. Mufeng. Rxjavademo E/TAG: onNext: Hello 09-06 06:02:16. 609, 22728-22728 / com. Mufeng. Rxjavademo E/TAG: onNext: Hello 09-06 06:02:18. 384, 22728-22728 / com. Mufeng. Rxjavademo E/TAG: onNext: Hello 09-06 06:02:18. 384, 22728-22728 / com. Mufeng. Rxjavademo E/TAG: onNext: Hello 09-06 06:02:20. 177, 22728-22728 / com. Mufeng. Rxjavademo E/TAG: onNext: Hello 09-06 06:02:20. 178, 22728-22728 / com. Mufeng. Rxjavademo E/TAG: the onCompleteCopy the code
5. The Empty/Never/Error
1. empty()
Create an observer object and send only the onComplete event
Observable.empty<Int>()
.subscribeBy(
onNext = { Log.e("TAG"."Accept the onNext event ==$it")},
onError = { Log.e("TAG"."In response to an Error event:${it.localizedMessage}")},
onComplete = { Log.e("TAG"."Response to Complete Event")})Copy the code
2. error()
The observed object created by this method has the characteristics of sending events: only sending Error events, directly notifying exceptions can be customized exceptions
Observable.error<Int>(Throwable("Unknown exception"))
.subscribeBy(onError = { Log.e("TAG"."In response to an Error event:${it.localizedMessage}")})
Copy the code
3. never()
No events are sent
Observable.never<Int>()
.subscribeBy(
onNext = { Log.e("TAG"."Accept the onNext event ==$it")},
onError = { Log.e("TAG"."In response to an Error event:${it.localizedMessage}")},
onComplete = { Log.e("TAG"."Response to Complete Event")})Copy the code
Six Defer.
Until an observer subscribes, an Observable is dynamically created and a new Observable is created for each observer
Every time you subscribe, you get a newly created Observable, which ensures that the data in the Observable is up to date
- Common operators
- ․ RxJava 2 x: defer
- RxKotlin: defer
Var I = 100 val observable = observable. defer {observable. just(I)} Then defer() is used to create the Observable object Observable. SubscribeBy (onNext = {log.e ()"TAG"."Accept the onNext event ==$it") },
onError = { Log.e("TAG"."In response to an Error event:${it.localizedMessage}") },
onComplete = { Log.e("TAG"."Response to Complete Event")})Copy the code
7. The Timer
Create an Observable that emits a special value after a given delay
Observable.timer(2, TimeUnit.SECONDS)
.subscribeBy(
onNext = { Log.e("TAG"."Accept the onNext event ==$it") },
onError = { Log.e("TAG"."In response to an Error event:${it.localizedMessage}") },
onComplete = { Log.e("TAG"."Response to Complete Event")})Copy the code
Eight Interval.
An Observable that emits integer sequences at a fixed time interval
- Common operators
- RxJava 2.X: interval
- RxKotlin: interval
/** * First argument: the first delay time * second argument: the interval between sending events * third argument: TimeUnit */ observable. interval(2,1, timeunit.seconds). SubscribeBy (onNext = {log.e ("TAG"."Accept the onNext event ==$it") },
onError = { Log.e("TAG"."In response to an Error event:${it.localizedMessage}") },
onComplete = { Log.e("TAG"."Response to Complete Event")})Copy the code
Nine Range.
Sends a sequence of events in succession, ranges can be specified
/** * range sends a sequence of integers without delay * first argument: the start of the sequence of events * second argument: / Observable. Range (3,10). SubscribeBy (onNext = {log.e)"TAG"."Accept the onNext event ==$it") },
onError = { Log.e("TAG"."In response to an Error event:${it.localizedMessage}") },
onComplete = { Log.e("TAG"."Response to Complete Event"}) /** * intervalRange: send sequence of integers, delay can be set * first argument: start of sequence of events * second argument: number of events * third argument: event interval * fourth argument: Incident unit * / observables. IntervalRange (3,10,2,1, TimeUnit. SECONDS). SubscribeBy (onNext = {Log. E ("TAG"."Accept the onNext event ==$it") },
onError = { Log.e("TAG"."In response to an Error event:${it.localizedMessage}") },
onComplete = { Log.e("TAG"."Response to Complete Event")})Copy the code
The above is the use of Observable creation operators. These operators can be used not only in Observables, but also in Flowable
The Demo address:
RxJavaDemo