This study notes mainly refers to the Android RxJava: the most basic operators in detail – create operators this blog to learn. Blogger article illustrated, explain the very carefully, you can click on the blue font or click here to see the original, if the link fails, here is the original connection: www.jianshu.com/p/e19f8ed86…
This study note focuses on each of the creation operators from the above article, including code examples for each operator, ObservableOnSubscribe (ObservableOnSubscribe).subscribe(Observer) ObservableOnSubscribe (ObservableOnSubscribe).subscribe(Observer)
The version of RxJava used in this study note:
Implementation 'IO. Reactivex. Rxjava2: rxjava: 2.2.21'Copy the code
RxJava create operator
Basic create
create()
In the previous tutorial, we learned how to create an observed using the create() method, as follows:
Observable.create(object : ObservableOnSubscribe<Int> {
@SuppressLint("CheckResult")
override fun subscribe(emitter: ObservableEmitter<Int>) {
for (i in 0 until 10)
emitter.onNext(i)
emitter.onComplete()
}
}).subscribe()
Copy the code
In the above code we create an observatee using the create(ObservableOnSubscribe) method, which passes an ObservableOnSubscribe instance. When the code executes to subscribe(), The subscribe() method in ObservableOnSubscribe is executed, calling the ObservableEmitter to emit data. Here’s a quick trace of the create() method’s source code:
-
Create (ObservableOnSubscribe) :
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); } Copy the code
As you can see, the ObservableCreate(Source) method is created, and an ObservableCreate instance is created. And pass in the ObservableOnSubscribe instance we created earlier:
public final class ObservableCreate<T> extends Observable<T> { final ObservableOnSubscribe<T> source; public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; }... Omit other source code... }Copy the code
ObservableCreate is an ObservableCreate that inherits from Observable.
-
RxJavaPlugins.onAssembly(ObservableCreate)
The rxJavaplugins.onsddembly (ObservableCreate) method is executed after the ObservableCreate is created.
@NonNull public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) { Function<? super Observable, ? extends Observable> f = onObservableAssembly; if(f ! =null) { return apply(f, source); } return source; } Copy the code
In this method, we first get the onOnservableAssembly variable, and then check whether the variable is null. If it is null, we return the parameters we passed in. Let’s look at the definition and assignment of the variable:
// The variable defaults to null @Nullable static volatile Function<? super Observable, ? extends Observable> onObservableAssembly; // The parameter is assigned only here public static void setOnObservableAssembly(@Nullable Function<? super Observable, ? extends Observable> onObservableAssembly) { if (lockdown) { throw new IllegalStateException("Plugins can't be changed anymore"); } RxJavaPlugins.onObservableAssembly = onObservableAssembly; } // The above method is called only here public static void reset(a) {... Other code... setOnObservableAssembly(null); . Other code... }Copy the code
That is to say, if we don’t have to manually call RxJavaPlugins. SetOnObservableAssembly () method, onObservableAssembly is null, therefore, The rxJavaplugins.onAssembly () method returns the ObservableCreate that we passed in.
Now we know that when we call ObservableCreate (), we get an ObservableCreate instance that holds the ObservableOnSubscribe instance we passed.
-
ObservableCreate.subscribe()
After the observed is created, it will connect the observed and the observer through subscription, as shown below:
.subscribe{ Logs.e("observableWithCreate onNext:$it")}Copy the code
Here we pass only the body of the onNext() method. Here is the source code for this method:
// Call this method directly public final Disposable subscribe(Consumer<? super T> onNext) { return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer()); } // This method is called indirectly @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) { ObjectHelper.requireNonNull(onNext, "onNext is null"); ObjectHelper.requireNonNull(onError, "onError is null"); ObjectHelper.requireNonNull(onComplete, "onComplete is null"); ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null"); LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe); subscribe(ls); return ls; } Copy the code
Subscribe () actually calls the second method above, which nulls the argument and creates a LambdaObser object. All we need to know is that the LambdaObserver implements the Observer interface. Subscribe (Observer) = subscribe(Observer); subscribe(Observer) = subscribe(Observer);
@SchedulerSupport(SchedulerSupport.NONE) @Override public final void subscribe(Observer<? super T> observer) { // Other source code subscribeActual(observer); // Other source code } protected abstract void subscribeActual(Observer<? super T> observer); Copy the code
In this method, the subscribeActual(Observer) method is mainly called, which is an abstract method and needs to be implemented by subclasses. As we know from the above source code, the observed is the object of the class ObservableCreate. So let’s go into this class and look:
/ / the subscribeActual ObservableCreate () final ObservableOnSubscribe<T> source; @Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch(Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); }}Copy the code
In this method, we create CreateEmitter, and then we find source.subscribe(parent). The source is the ObservableOnSubscribe object we pass in the ObservableOnSubscribe (ObservableOnSubscribe) method, Here is call our incoming ObservableOnSubscribe. Subscribe (emitter) method.
-
ObservableOnSubscribe.subscribe()
Through the above understanding, when we already know the call ObservableOnSubscribe. The subscribe () method, generally we are all here to write code to send events, and in the source code of the above, We know call ObservableOnSubscribe. Subscribe (ObservableEmitter) is passed in this way a CreateEmitter, which means we call emitter. OnNext (), CreateEmitter. OnComplete () these methods are all relative to CreateEmitter. CreateEmitter () onNext() is a function of CreateEmitter.
@Override public void onNext(T t) { if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } if (!isDisposed()) { observer.onNext(t); } } Copy the code
As you can see, here is the onNext() method in the Observer called directly.
ObservableOnSubscribe (ObservableOnSubscribe). Subscribe (Observer). ObservableOnSubscribe (ObservableOnSubscribe) Then create an ObservableCreate and pass in the ObservableCreate to save, When the observable. subscribe(Observer) method is called, a CreateEmitter is generated from the incoming Observer, and the data is emitted by calling the subscribe() method of the emitter and calling the relevant method in CreateEmitter.
Quickly create & send events
just()
This method can be used to quickly create an observed object. It can send up to 10 incoming events directly. The just() method has 10 overloaded functions, with 1 to 10 passable arguments:
Observable.just(1.2.3.4.5.6)
.subscribe {
Logs.e("observableWithJust onNext: $it")}Copy the code
Create an observed using just(). This code is equivalent to calling onNext(1),onNext(2),onNext(3),onNext(4),onNext(5),onNext(6). Of course, the source code is not implemented in this way, but after understanding the process of observable.create (), the call process of just() method is similar, except that the internal created Observables and Disposable are different, but the execution process is similar.
The running effect is as follows:
onNext: 1
observableWithJust onNext: 2
observableWithJust onNext: 3
onNext: 4
observableWithJust onNext: 5
observableWithJust onNext: 6
Copy the code
fromArray()
This method is used to quickly create an observed object, directly emitting the array data passed in. The inside of just() above is also fromArray(). The following code demonstrates the use of this method:
private fun observableWithFromArray(a) {
val intArray = arrayOf<Int> (1.2.3.4.5.6.7)
Observable.fromArray(*intArray)
.subscribe {
Logs.e("observableWithFromArray onNext:$it")}}Copy the code
Note that since Observable. FromArray (Vararg) needs to pass in a parameter of type Vararg, if we pass intArray as a parameter, Observable.fromarray () will assume that this is an IntArray parameter, so the argument passed here is * IntArray, which can be thought of as converting the array to vararg.
You can also iterate through an array using this method.
The running effect is as follows:
observableWithFromArray onNext:1
observableWithFromArray onNext:2
observableWithFromArray onNext:3
observableWithFromArray onNext:4
observableWithFromArray onNext:5
observableWithFromArray onNext:6
observableWithFromArray onNext:7
Copy the code
fromIterable()
Quickly create an observed object, send out the elements of the collection in turn, can be used to quickly create the observed object and need to send multiple data, can also be used to traverse the collection, the code is as follows:
val list = mutableListOf<String>("2321"."123123"."ASAs")
Observable.fromIterable(list)
.subscribe {
Logs.e("observableWithFromIterable onNext:$it")}Copy the code
The running results are as follows:
observableWithFromIterable onNext:2321
observableWithFromIterable onNext:123123
observableWithFromIterable onNext:ASAs
Copy the code
never()
.empty()
.error()
These three methods are mainly used for testing, and are seldom used at ordinary times. Among them:
never()
[Fixed] The created observed does not send any events.empty()
The created observed is only sentonComplete()
Events;error()
The created observed is only sentonError()
Event, you can customize the error type.
never()
private val mObserver = object: Observer<Any? > {override fun onSubscribe(d: Disposable) {
Logs.e("observableWithNever onSubscribe...")}override fun onError(e: Throwable) {
Logs.e("observableWithNever onError...")}override fun onComplete(a) {
Logs.e("observableWithNever onComplete...")}override fun onNext(t: Any) {
Logs.e("observableWithNever onNext...$t") } } Observable.never<Any? >() .subscribe(mObserver) }Copy the code
The code above runs as follows:
observableWithNever onSubscribe...
Copy the code
As you can see, no events are sent here.
empty()
Observable.empty<Any>()
.subscribe(mObserver)
Copy the code
The running results are as follows:
observableWithNever onSubscribe...
observableWithNever onComplete...
Copy the code
As you can see, empty() sends the completion event directly.
error()
Observable.error<IllegalArgumentException>(IllegalArgumentException("Just testing the error() method"))
.subscribe(mObserver)
Copy the code
The running results are as follows:
observableWithNever onSubscribe... observableWithNever onError... Java. Lang. IllegalArgumentException: just testing error () methodCopy the code
The onError() method is called directly, and the error message printed is the same as the one we set.
Create a delay
Operators created after delay are used to perform scheduled operations and periodic operations. Scheduled operations: perform y after x time, and periodic operations: perform Y after x time.
defer()
This operator does nothing to dynamically create an Observable and send an event until an Observer subscribes. This operator creates Observable objects using the Observable factory method. After each subscription, it gets a newly created Observable, which ensures that the data in the Observable is new.
// Start data
var startNum = 10
// Create the observed by defer, which returns an Observable, using just
var deferObservable = Observable.defer(object : Callable<ObservableSource<Int> > {override fun call(a): ObservableSource<Int> {
Logs.e("Create observables")
return Observable.just(startNum)
}
})
// Reassign the data
startNum = 100
// Create an observer and start subscribing
var observer = object : Observer<Int> {
override fun onSubscribe(d: Disposable) {
Logs.e("ObservableWithDefer starts the subscription :onSubscribe():$d")}override fun onNext(t: Int) {
Logs.e("ObservableWithDefer gets the data$t")}override fun onError(e: Throwable) {
Logs.e("observableWithDefer onError:$e")}override fun onComplete(a) {
Logs.e("observableWithDefer onComplete....")
}
}
Logs.e("Start subscribing...")
// Start subscribing
deferObservable.subscribe(observer)
}
Copy the code
The program run output log is as follows:
Start subscribing... ObservableWithDefer creates Observable observableWithDefer to subscribe :onSubscribe():0 observableWithDefer gets the data 100 observableWithDefer onComplete....Copy the code
Observables are created after the observable. subscribe(Observer) method is executed, so the data sent by the Observer is the modified data.
timer()
This method creates an observer quickly and sends a value of 0(type Long) after the specified delay time is reached.
Observable.timer(3, TimeUnit.SECONDS)
.subscribe({
Logs.e("observableWithTimer onNext:$it")
}, {
Logs.e("observableWithTimer onError:$it")
}, {
Logs.e("observableWithTimer onComplete...")
}, {
Logs.e("ObservableWithTimer starts the subscription:$it")})Copy the code
Parameter Description:
- The first parameter “3” indicates the time to delay
- Second parameter
TimeUnit.SECONDS
Represents a unit of time. The first two parameters together form the specific time to be delayed - The third parameter is in
timer()
And we can specify oneScheduler
, which is used by defaultScheduler.computation()
The print result is as follows:
The 2021-03-25 16:01:42. 892, 22372-22372 / com. Project. Mystudyproject E/com. Project. Mystudyproject: ObservableWithTimer began to subscribe to: null 16:01:45 2021-03-25. 897. 22372-22422 / com project. Mystudyproject E/com.project.mystudyproject: ObservableWithTimer onNext: 0 2021-03-25 16:01:45. 900, 22372-22422 / com. Project. Mystudyproject E/com.project.mystudyproject: observableWithTimer onComplete...Copy the code
As can be seen from the timeline in front of the log, onSubscribe(Disposable) will be called immediately after the subscription. After 3 seconds, it will get the data 0 and print onComplete… . Null does not mean that Disposable is empty. This Disposable is not empty and can be used directly.
interval()
This method quickly creates an Observable that sends events at specified intervals. The test code is as follows:
var disposable: Disposable? = null
var num: Long = -1
Observable.interval(3.1, TimeUnit.SECONDS)
.subscribe({
num = it
Logs.e("observableWithInterval onNext: $it")
if (num > 20) disposable? .dispose() }, { Logs.e("observableWithInterval onError: $it")
}, {
Logs.e("observableWithInterval onComplete...")
}, {
disposable = it
Logs.e("observableWithInterval onSubscribe:$it")})}Copy the code
Parameter Description:
- The first parameter
3
Indicates the delay time for sending the event for the first time. If this parameter is not specified, take the interval set by the second parameter - Second parameter
1
The interval for sending subsequent events must be specified - The third parameter
TimeUnit.SECONDS
Time unit - It can also be specified in overloaded methods
Scheduler
, do not specify the default isSchedulers.computation()
The running results are as follows:
The 2021-03-25 16:25:08. 595, 23896-23896 / com. Project. Mystudyproject E/com. Project. Mystudyproject: ObservableWithInterval onSubscribe: null 16:25:11 2021-03-25. 599. 23896-23951 / com project. Mystudyproject E/com.project.mystudyproject: observableWithInterval onNext: 0 2021-03-25 16:25:12. 598, 23896-23951 / com. Project. Mystudyproject E/com project. Mystudyproject: observableWithInterval onNext: 1 2021-03-25 16:25:13. 599, 23896-23951 / com. Project. Mystudyproject E/com project. Mystudyproject: observableWithInterval onNext: 2 2021-03-25 16:25:14. 599, 23896-23951 / com. Project. Mystudyproject E/com project. Mystudyproject: observableWithInterval onNext: 3Copy the code
According to the printed logs, after the subscription, the first event is sent at an interval of 3 seconds, and subsequent events are sent at an interval of 1 second.
Modify the code not to specify the first delay time:
var disposable: Disposable? = null
var num: Long = -1
Observable.interval( 3, TimeUnit.SECONDS)
.subscribe({
num = it
Logs.e("observableWithInterval onNext: $it")
if (num > 20) disposable? .dispose() }, { Logs.e("observableWithInterval onError: $it")
}, {
Logs.e("observableWithInterval onComplete...")
}, {
disposable = it
Logs.e("observableWithInterval onSubscribe:$it")})}Copy the code
In the above code, the delay time of sending the event for the first time is cancelled, and only the interval time of sending the event is set as 3s. The final printed data is as follows:
The 2021-03-25 16:27:31. 292, 24066-24066 / com. Project. Mystudyproject E/com. Project. Mystudyproject: ObservableWithInterval onSubscribe: null 16:27:34 2021-03-25. 298. 24066-24139 / com project. Mystudyproject E/com.project.mystudyproject: observableWithInterval onNext: 0 2021-03-25 16:27:37. 296, 24066-24139 / com. Project. Mystudyproject E/com project. Mystudyproject: observableWithInterval onNext: 1 2021-03-25 16:27:40. 297, 24066-24139 / com. Project. Mystudyproject E/com project. Mystudyproject: observableWithInterval onNext: 2Copy the code
It can be seen that the first event will be sent 3s after the subscription, and the subsequent event will be sent every 3s.
intervalRange()
This method is characterized by: quickly create an observed object, every specified time to send events, you can specify the number of events sent, the sequence of events sent from 0, incremented by 1 sequence. The main difference from interval() is that you can specify the number of events to send.
Observable.intervalRange(10.6.3.1, TimeUnit.SECONDS)
.subscribe({
Logs.e("observableWithIntervalRange onNext: $it")
}, {
Logs.e("observableWithIntervalRange onError:$it")
}, {
Logs.e("observableWithIntervalRange onComplete...")
}, {
Logs.e("observableWithIntervalRange onSubscribe:$it")})Copy the code
Parameter Description:
- The first parameter
10
Indicates the data from which the event is sent10
start - Second parameter
6
Indicates that six events are sent from10
to15
- The third parameter
3
Indicates the delay for sending the event for the first time - The fourth parameter
1
Indicates the interval for sending subsequent events - The fifth parameter
TimeUnit.SECONDS
Indicates the unit of time. It is used with two parameters - It can also be specified in overloaded methods
Scheduler
Parameter, used when no parameter is specifiedSchedulers.computation()
The running results are as follows:
The 2021-03-25 16:40:28. 233, 24420-24420 / com. Project. Mystudyproject E/com. Project. Mystudyproject: ObservableWithIntervalRange onSubscribe: null 16:40:31 2021-03-25. 237. 24420-24681 / com project. Mystudyproject E/com.project.mystudyproject: observableWithIntervalRange onNext: 10 2021-03-25 16:40:32. 236, 24420-24681 / com. Project. Mystudyproject E/com project. Mystudyproject: observableWithIntervalRange onNext: 11 2021-03-25 16:40:33. 236, 24420-24681 / com. Project. Mystudyproject E/com. Project. Mystudyproject: observableWithIntervalRange onNext: 12 2021-03-25 16:40:34. 236, 24420-24681 / com. Project. Mystudyproject E/com project. Mystudyproject: observableWithIntervalRange onNext: 13 2021-03-25 16:40:35. 236, 24420-24681 / com. Project. Mystudyproject E/com project. Mystudyproject: observableWithIntervalRange onNext: 14 16:40:36 2021-03-25. 236. 24420-24681 / com project. Mystudyproject E/com. Project. Mystudyproject: observableWithIntervalRange onNext: 15 16:40:36 2021-03-25. 236. 24420-24681 / com project. Mystudyproject E/com. Project. Mystudyproject: observableWithIntervalRange onComplete...Copy the code
As you can see, after the subscription is successful, the event delay for the first time is 3s, and the subsequent events are sent every 1s, a total of 6 times, and then onComplete is called to complete.
range()
The characteristics of this method are: quickly create an observed object, send a sequence of events, can specify a range. The sequence of events sent also starts at 0 and increments by 1 until the specified number of sent events is reached. The difference from intervalRange() is that there is no delay or interval.
Observable.range(100.10)
.subscribe({
Logs.e("observableWithRange onNext: $it")
}, {
Logs.e("observableWithRange onError:$it")
}, {
Logs.e("observableWithRange onComplete...")
}, {
Logs.e("observableWithRange onSubscribe:$it")})Copy the code
Parameter Description:
- The first parameter
100
The data that starts sending the event, which is the data of the first event100
- Second parameter
10
Sends a total of 10 data, in this case from100
to109
The running results are as follows:
The 2021-03-25 16:52:05. 379, 25010-25010 / com. Project. Mystudyproject E/com. Project. Mystudyproject: ObservableWithRange onSubscribe: 0 2021-03-25 16:52:05. 379, 25010-25010 / com. Project. Mystudyproject E/com.project.mystudyproject: observableWithRange onNext: 100 2021-03-25 16:52:05. 379, 25010-25010 / com. Project. Mystudyproject E/com project. Mystudyproject: observableWithRange onNext: 101 2021-03-25 16:52:05. 379, 25010-25010 / com. Project. Mystudyproject E/com project. Mystudyproject: observableWithRange onNext: 102 2021-03-25 16:52:05. 379, 25010-25010 / com. Project. Mystudyproject E/com project. Mystudyproject: observableWithRange onNext: 103 2021-03-25 16:52:05. 379, 25010-25010 / com. Project. Mystudyproject E/com project. Mystudyproject: observableWithRange onNext: 104 2021-03-25 16:52:05. 379, 25010-25010 / com. Project. Mystudyproject E/com project. Mystudyproject: observableWithRange onNext: 105 2021-03-25 16:52:05. 379, 25010-25010 / com. Project. Mystudyproject E/com project. Mystudyproject: observableWithRange onNext: 106 2021-03-25 16:52:05. 379, 25010-25010 / com. Project. Mystudyproject E/com project. Mystudyproject: observableWithRange onNext: 107 2021-03-25 16:52:05. 379, 25010-25010 / com. Project. Mystudyproject E/com project. Mystudyproject: observableWithRange onNext: 108 2021-03-25 16:52:05. 379, 25010-25010 / com. Project. Mystudyproject E/com project. Mystudyproject: observableWithRange onNext: 109 2021-03-25 16:52:05. 379, 25010-25010 / com. Project. Mystudyproject E/com project. Mystudyproject: observableWithRange onComplete...Copy the code
As you can see, events are sent immediately after the subscription is successful, without delay or interval.
rangeLong()
This method does the same thing as the range() method, but is of type Long.
Observable.rangeLong(100.10)
.subscribe({
Logs.e("observableWithRangeLong onNext: $it,type is${it::class.java.name}")
}, {
Logs.e("observableWithRangeLong onError:$it")
}, {
Logs.e("observableWithRangeLong onComplete...")
}, {
Logs.e("observableWithRangeLong onSubscribe:$it")})Copy the code
The running results are as follows:
The 2021-03-25 17:06:41. 221, 25745-25745 / com. Project. Mystudyproject E/com. Project. Mystudyproject: ObservableWithRangeLong onSubscribe: 0 2021-03-25 17:06:41. 221, 25745-25745 / com. Project. Mystudyproject E/com.project.mystudyproject: observableWithRangeLong onNext: 100, type isjava. Lang. Long 17:06:41. 2021-03-25, 221, 25745-25745 / com. Project. Mystudyproject E/com project. Mystudyproject: observableWithRangeLong onNext: 101, type isjava. Lang. Long 17:06:41. 2021-03-25, 221, 25745-25745 / com. Project. Mystudyproject E/com project. Mystudyproject: observableWithRangeLong onNext: 102, type isjava. Lang. Long 17:06:41. 2021-03-25, 221, 25745-25745 / com. Project. Mystudyproject E/com project. Mystudyproject: observableWithRangeLong onNext: 103, type isjava. Lang. Long 17:06:41. 2021-03-25, 221, 25745-25745 / com. Project. Mystudyproject E/com project. Mystudyproject: observableWithRangeLong onNext: 104, type isjava. Lang. Long 17:06:41. 2021-03-25, 221, 25745-25745 / com. Project. Mystudyproject E/com project. Mystudyproject: observableWithRangeLong onNext: 105, type isjava. Lang. Long 17:06:41. 2021-03-25, 221, 25745-25745 / com. Project. Mystudyproject E/com project. Mystudyproject: observableWithRangeLong onNext: 106, type isjava. Lang. Long 17:06:41. 2021-03-25, 221, 25745-25745 / com. Project. Mystudyproject E/com project. Mystudyproject: observableWithRangeLong onNext: 107, type isjava. Lang. Long 17:06:41. 2021-03-25, 221, 25745-25745 / com. Project. Mystudyproject E/com project. Mystudyproject: observableWithRangeLong onNext: 108, type isjava. Lang. Long 17:06:41. 2021-03-25, 221, 25745-25745 / com. Project. Mystudyproject E/com project. Mystudyproject: observableWithRangeLong onNext: 109, type isjava. Lang. Long 17:06:41. 2021-03-25, 221, 25745-25745 / com. Project. Mystudyproject E/com project. Mystudyproject: observableWithRangeLong onComplete...Copy the code
You can see that the data in onNext() is indeed of type Long.