1. Introduction
I have been wanting to tidy up RXJava for a long time, but I didn’t have the time. Recently, BECAUSE I left my job, I finally had the time to tidy up. Because I plan to document a framework for every blog post. So in order to describe clearly, this blog may be a little long (including the introduction of RXJava, use, back pressure, principle, etc.), I hope you can read carefully, harvest is certainly there, will also use a lot of diagrams to introduce, so that you can deepen understanding. Can also be used as a tool blog, if you need to use it at any time.
Back pressure and principles will continue to follow, stay tuned
2. Introduction
What is RXJava? An event-driven programming model based on an asynchronous data stream, the entire data stream is like a river, which can be observed (listened on), filtered, manipulated, or merged with other data streams into a new one.
The three elements
- Observable
- An Observer
- Subscribe
Well, because adhere to the idea of diagrams, in the introduction of rXJava operators, will use a large number of diagrams to express, diagrams from the official, here first to introduce how to see.
Ok, let’s go to the masturbation
3. Simple use
1. First add dependencies to build. Gradle file
implementation 'the IO. Reactivex. Rxjava2: rxjava: 2.1.4'
implementation 'the IO. Reactivex. Rxjava2: rxandroid: 2.0.2'
Copy the code
2. When the dependency setup is complete, let’s first write the simplest case, with a total of 3 steps
- 2.1 Create observed
// Create the observed
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("Hello?");
emitter.onNext("I love China");
emitter.onNext("Wish our motherland prosperity and strength."); emitter.onComplete(); }});Copy the code
- 2.2 Creating an Observer
// Create an observer
Observer observer = new Observer<String>(){
@Override
public void onSubscribe(Disposable d) {
Log.i("lybj"."Ready to listen.");
}
@Override
public void onNext(String s) {
Log.i("lybj", s);
}
@Override
public void onError(Throwable e) {
Log.i("lybj"."error");
}
@Override
public void onComplete(a) {
Log.i("lybj"."I'm done listening."); }};Copy the code
- 2.3 Subscribe (that is, associate the observed with the observer)
/ / subscribe
observable.subscribe(observer);
Copy the code
That’s it. Let’s see what happens
Isn’t that easy? Just a few more concepts
- OnNext () : The observer calls onNext() to receive data when the observable emits data by calling onNext()
- OnError () : The observer calls onError() when the observable calls this function, and no other events are sent
- OnComplete () : The observer calls onComplete() when the observable calls this function, and no other events are sent
In fact, RXJava, for example, is like the head of a shower. Data flow is like water flow. Various operators of its Observable are the head of a shower, which can have various modes, such as middle spray, surrounding spray, spray mist, etc. Depending on the operator, you can change various styles of data, and depending on the sprinkler head, you can change the flow of water to various styles. Next, take a look at the rich observable operators.
Let’s look at the outline
4. Create operators
1.create()
1.1 What do you do?
Creates the observed object
1.2 How to Use it?
// Create the observed
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("Hello?");
emitter.onNext("I love China");
emitter.onNext("Wish our motherland prosperity and strength.");
emitter.onComplete();
}
}).subscribe(new Observer<String>(){ // Associate the observer
@Override
public void onSubscribe(Disposable d) {
Log.i("lybj"."Ready to listen.");
}
@Override
public void onNext(String s) {
Log.i("lybj", s);
}
@Override
public void onError(Throwable e) {
Log.i("lybj"."error");
}
@Override
public void onComplete(a) {
Log.i("lybj"."I'm done listening."); }});Copy the code
1.3 the results
The associated observer can be invoked directly by chain
2 just()
2.1 What do you do?
From the above figure, it should be vividly illustrated that the main function is to create an observed and send events, but the events can not be more than 10.
2.2 How to Use it?
Observable.just("Xiao Ming"."Little red"."Xiao LAN").subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i("lybj"."Ready to listen.");
}
@Override
public void onNext(String s) {
Log.i("lybj", s+"Come");
}
@Override
public void onError(Throwable e) {
Log.i("lybj"."Error");
}
@Override
public void onComplete(a) {
Log.i("lybj"."Finished"); }});Copy the code
2.3 the results
3 timer()
3.1 What do you do?
A value of 0 is sent to the observer after the specified time. In a project, you can do some delay processing, similar to the delay in the Handler
3.2 How to Use it?
Observable.timer(2, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.i("lybj", aLong+""); }});Copy the code
3.3 the results
After a delay of 2 seconds, the result is sent to the Observer. Consumer and Observer are two ways to create an Observer, which is equivalent to the onNext method in the Observer.
4 interval()
4.1 What do you do?
Every once in a while an event is sent that starts at 0 and increments the number of 1. Similar to the timer in the project, do timers
4.2 How to Use it?
Observable.interval(3,TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.i("lybj", aLong+""); }});Copy the code
4.3 the results
5 intervalRange()
5.1 What do you do?
You can specify the start value and number of events to send, as interval() does.
5.2 How do I Use it?
Observable.intervalRange(100.4.0.10, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.i("lybj", aLong+""); }});Copy the code
5.3 the results
The parameters are, in order, the start value, the number of times the loop is executed, the initial delay time, the execution interval time, and the time unit
6 range()
6.1 What do you do?
Send a range of events simultaneously.
6.2 How can I Use it?
Observable.range(0.10).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i("lybj", integer+""); }});Copy the code
6.3 the results
7 rangeLong()
7.1 What do you do?
It acts the same as range(), but is of type Long
7.2 How Can I Use it?
Observable.rangeLong(0.10).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.i("lybj", aLong+""); }});Copy the code
7.3 the results
8 empty() & never() & error()
8.1 What do you do?
- Never () : no event is sent
- Error() : Sends the onError() event
- Empty () : Send the onComplete() event directly
8.2 How do I Use it?
private void empty_never_error(a){
Observable.empty().subscribe(new Observer(){
@Override
public void onSubscribe(Disposable d) {
Log.i("lybj"."Ready to listen.");
}
@Override
public void onNext(Object o) {
Log.i("lybj", o+"");
}
@Override
public void onError(Throwable e) {
Log.i("lybj"."onError");
}
@Override
public void onComplete(a) {
Log.i("lybj"."onComplete"); }});Copy the code
8.3 the results
If empty() then:
If error() then:
If never() then:
5. Conversion operators
1 map()
1.1 What do you do?
The map can convert the data type sent by the observer to another type
1.2 How to use it?
Observable.just("China"."Motherland"."Chinese soldier")
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
return "I love" + s;
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i("lybj", s); }});Copy the code
1.3 the results
In simple terms, the transmitted data can be reprocessed and transmitted to the observer
2 flatMap()
2.1 What do you do?
This method can integrate the elements of the event sequence and return a new observed. FlatMap () is similar to map(), but flatMap() returns an Observerable. Map() only returns data. It is recommended to use flatMap() instead of map() if you want to reuse the above creation operators during element reprocessing.
2.2 How to use it?
Observable.just("China"."Motherland"."Chinese soldier"."Corrupt officials")
.flatMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(String s) throws Exception {
if(s.equals("Corrupt officials")) {return Observable.error(new Exception("Corrupt officials cannot be liked."));
}
return Observable.just("I love"+s);
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i("lybj", s); }},new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.i("lybj", throwable.getMessage()); }});Copy the code
2.3 the results
The new Consumer method listens for Observable.error()
3 concatMap()
3.1 What do you do?
ConcatMap () and flatMap() are basically the same, except that concatMap() forwards events in order, while flatMap() is unordered.
3.2 How to use it?
Observable.just("China"."Motherland"."Chinese soldier"."Corrupt officials")
.concatMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(String s) throws Exception {
if(s.equals("Corrupt officials")) {return Observable.error(new Exception("Corrupt officials cannot be liked."));
}
return Observable.just("I love"+s);
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i("lybj", s); }},new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.i("lybj", throwable.getMessage()); }});Copy the code
3.3 the results
4 buffer()
4.1 What do you do?
Get a number of events from the list of events that need to be sent and send them in a buffer.
4.2 How to Use it?
Buffer takes two arguments, count and skip. Count the number of elements in the buffer. Skip represents how many elements to skip when sending the next sequence of events after the buffer is full.
Observable.just("1"."2"."3"."4"."5")
.buffer(2.1)
.subscribe(new Consumer<List<String>>() {
@Override
public void accept(List<String> strings) throws Exception {
Log.d("lybj"."Buffer size:" + strings.size());
for (String s : strings){
Log.d("lybj", s); }}});Copy the code
4.3 the results
5 scan()
5.1 What do you do?
The transmitted data is transformed by a function, and the result of the transformation is taken as a parameter along with the next transmitted data, and the final result is successively transmitted.
5.2 how to use
Observable.just(1.2.3.4.5)
.scan(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
Log.i("lybj"."integer01: " + integer + " integer02: "+ integer2);
return integer + integer2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i("lybj"."accept: "+ integer); }});Copy the code
5.3 the results
In simple terms, first will be the first element is returned to the observer, then 1, 2 and return to the observer, then the last calculation and, when the first element, which is 3, the second element, is always in order value, take the third element which is 3, then, 3 + 3 = 6, returned to the observer, and so on, will be 6 as the first element, The second element takes 4 and returns 6+4=10 to the observer.
The SACN operator iterates the results of the source Observable, performs operations according to custom rules, and outputs the results of each calculation to subscribers
6 window()
6.1 What do you do?
When events are sent, they are regrouped by number. The count argument in the window represents the specified amount. For example, if count is set to 2, each two pieces of data will be grouped into a group.
The difference between Window and buffer is that window splits data into observables and buffer splits data into lists
6.2 How can I Use it?
Observable.just("Ban"."Sun Shangxiang"."The line"."Fire female"."Galen")
.window(2)
.subscribe(new Consumer<Observable<String>>() {
@Override
public void accept(Observable<String> stringObservable) throws Exception {
Log.i("lybj"."Group start.");
stringObservable.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i("lybj", s); }}); }});Copy the code
6.3 the results
6. Combinatorial operators
1.concat()
1.1 What do you do?
You can group multiple observers together and then send events in the order they were sent before. Note that concat() can only send a maximum of four events.
1.2 How to use it?
private void concat(a){
Observable.concat(
Observable.just(1.2.3),
Observable.just(4.5),
Observable.just(6.7),
Observable.just(8.9))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i("lybj", integer+""); }}); }Copy the code
1.3 the results
2.concatArray()
2.1 What do you do?
Same as concat(), but concatArray() can send more than four observed objects.
2.2 How to use it?
Observable.concatArray(Observable.just(1.2.3.4),
Observable.just(5.6),
Observable.just(7.8.9.10),
Observable.just(11.12.13),
Observable.just(14.15),
Observable.just(16))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i("lybj", integer+""); }});Copy the code
2.3 the results
3.merge()
3.1 What do you do?
This method is basically the same as concat(), except that concat() sends events sequentially, whereas merge() sends events in parallel, which can send only four.
3.2 How to use it?
Observable.merge(Observable.just(1.2.3.4),
Observable.just(5.6),
Observable.just(7.8.9.10),
Observable.just(11.12.13))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i("lybj", integer+""); }});Copy the code
3.3 the results
4.zip()
4.1 What do you do?
The ZIP operator is used to merge multiple data sources and generate a new one. The newly generated data source is in strict accordance with the data transmission order of the data source before the merger, and the data number of the new data source is equal to the data number of the data source that transmitted the least data before the merger.
4.2 How to Use it?
Observable.zip(Observable.just(1.2.3),
Observable.just("A"."B"."C"."D"."E"),
new BiFunction<Integer, String, String>(){
@Override
public String apply(Integer o1, String o2) throws Exception {
return o1 +"_"+ o2;
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String o) throws Exception {
Log.i("lybj", o); }});Copy the code
4.3 the results
5.startWith() & startWithArray()
5.1 What do you do?
Append events before sending them, startWith() appends one event, and startWithArray() can append multiple events. Additional events are emitted first.
5.2 How do I Use it?
Observable.just(1.2.3)
.startWithArray(4.5)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i("lybj", integer+""); }});Copy the code
5.3 the results
6.count()
6.1 What do you do?
Returns the number of events sent by the observer.
6.2 How to Use it?
Observable.just(2.3.4.5.6)
.count()
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.i("lybj"."Number of incidents:"+ aLong); }});Copy the code
6.3 the results
7. Functional operators
1.delay()
1.1 What do you do?
Delay sending an event.
1.2 How to use it?
Observable.just(1.2.3.4)
.delay(3, TimeUnit.SECONDS)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i("lybj", integer+""); }});Copy the code
1.3 the results
Periodic function
2.1 What do you do?
DoOnEach (): This method is called back each time an event is sent
DoOnNext (): Observable calls back to this method before sending onNext().
DoAfterNext (): Observable calls back to this method every time it sends onNext().
DoOnComplete (): Observable calls back to this method every time it sends onComplete().
DoOnError (): Observable calls back to this method before sending onError().
DoOnSubscribe (): Observable calls back to this method every time it sends onSubscribe().
DoOnDispose (): Call back to this method after calling Disposable dispose().
DoOnTerminate (): Callback before onError or onComplete is sent.
DoAfterTerminate (): callback after onError or onComplete is sent.
DoFinally (): Calls back this method after all events have been sent. DoAfterTerminate () is not called back if unsubscribed, and doFinally() is called back anyway, at the end of the sequence of events.
2.2 How to use it?
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}).doOnEach(new Consumer<Notification<Integer>>() {
@Override
public void accept(Notification<Integer> integerNotification) throws Exception {
Log.i("lybj"."DoOnEach method executes, result:"+ integerNotification.getValue());
}
}).doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i("lybj"."DoOnNext method executed, result:"+ integer);
}
}).doAfterNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i("lybj"."DoAfterNext method executed, result:"+ integer);
}
}).doOnComplete(new Action() {
@Override
public void run(a) throws Exception {
Log.i("lybj"."DoOnComplete method executed.");
}
}).doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.i("lybj"."DoOnError method executed.");
}
}).doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
Log.i("lybj"."The doOnSubscribe method executes.");
}
}).doOnDispose(new Action() {
@Override
public void run(a) throws Exception {
Log.i("lybj"."DoOnDispose method executed.");
}
}).doOnTerminate(new Action() {
@Override
public void run(a) throws Exception {
Log.i("lybj"."DoOnTerminate method executed.");
}
}).doAfterTerminate(new Action() {
@Override
public void run(a) throws Exception {
Log.i("lybj"."DoAfterTerminate method executed");
}
}).doFinally(new Action() {
@Override
public void run(a) throws Exception {
Log.i("lybj"."DoFinally method executed.");
}
}).subscribe(new Observer<Integer>() {
private Disposable disposable;
@Override
public void onSubscribe(Disposable d) {
disposable = d;
Log.i("lybj"."------ observer onSubscribe() execution");
}
@Override
public void onNext(Integer integer) {
Log.i("lybj"."------ observer onNext() executes:"+integer);
if(integer == 2) {// disposable.dispose(); // Unsubscribe}}@Override
public void onError(Throwable e) {
Log.i("lybj"."------ observer onError() execution");
}
@Override
public void onComplete(a) {
Log.i("lybj"."------ observer onComplete() execution"); }});Copy the code
2.3 the results
3.onErrorReturn()
3.1 What do you do?
When called back after receiving an onError() event, the returned value calls back to the onNext() method and normally terminates the sequence of events.
3.2 How to use it?
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("Xiao Ming: Here.");
emitter.onError(new IllegalStateException("error"));
emitter.onNext("Xiao Fang: Here");
}
}).onErrorReturn(new Function<Throwable, String>() {
@Override
public String apply(Throwable throwable) throws Exception {
Log.i("lybj"."Xiao Hong is taking a day off.");
return "Xiao Li: Here";
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {}@Override
public void onNext(String s) {
Log.i("lybj", s);
}
@Override
public void onError(Throwable e) {
Log.i("lybj", e.getMessage());
}
@Override
public void onComplete(a) {}});Copy the code
3.3 the results
4.onErrorResumeNext()
4.1 What do you do?
When onError() is received, a new Observable is returned and the sequence of events is terminated normally.
4.2 How to Use it?
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("Xiao Ming");
emitter.onNext("Little party");
emitter.onNext("Little red");
emitter.onError(new NullPointerException("error"));
}
}).onErrorResumeNext(new Function<Throwable, ObservableSource<? extends String>>() {
@Override
public ObservableSource<? extends String> apply(Throwable throwable) throws Exception {
return Observable.just("1"."2"."3");
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i("lybj"."Ready to listen.");
}
@Override
public void onNext(String s) {
Log.i("lybj", s);
}
@Override
public void onError(Throwable e) {
Log.i("lybj", e.getMessage());
}
@Override
public void onComplete(a) {
Log.i("lybj"."onComplete"); }});Copy the code
4.3 the results
5.onExceptionResumeNext()
5.1 What do you do?
Similar to onErrorResumeNext(), but this method can only catch exceptions.
5.2 How do I Use it?
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("Xiao Ming");
emitter.onNext("Little party");
emitter.onNext("Little red");
emitter.onError(new Error("error"));
}
}).onExceptionResumeNext(new Observable<String>() {
@Override
protected void subscribeActual(Observer observer) {
observer.onNext("Zhang");
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i("lybj"."Ready to listen.");
}
@Override
public void onNext(String s) {
Log.i("lybj", s);
}
@Override
public void onError(Throwable e) {
Log.i("lybj", e.getMessage());
}
@Override
public void onComplete(a) {
Log.i("lybj"."onComplete"); }});Copy the code
5.3 the results
6.retry()
6.1 What do you do?
If an error event occurs, the sequence of all events is resend. Times is the number of retransmissions.
6.2 How to Use it?
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("1");
emitter.onNext("2");
emitter.onError(new IllegalStateException());
}
}).retry(2)
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i("lybj"."Ready to listen.");
}
@Override
public void onNext(String s) {
Log.i("lybj", s);
}
@Override
public void onError(Throwable e) {
Log.i("lybj", e.getMessage());
}
@Override
public void onComplete(a) {
Log.i("lybj"."onComplete"); }});Copy the code
6.3 the results
7.retryUntil()
7.1 What do you do?
After an error event occurs, you can use this method to determine whether to continue sending the event.
7.2 How do I Use it?
Observable.create(new ObservableOnSubscribe<String>() {
public void subscribe(@NonNull ObservableEmitter<String> emitter){
emitter.onNext("1");
emitter.onNext("2");
emitter.onNext("3");
emitter.onError(new NullPointerException("error"));
emitter.onNext("4");
emitter.onNext("5");
}
}).retryUntil(new BooleanSupplier() {
@Override
public boolean getAsBoolean(a) throws Exception {
Log.i("lybj"."getAsBoolean");
return true;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {}@Override
public void onNext(String s) {
Log.i("lybj", s);
}
@Override
public void onError(Throwable e) {}@Override
public void onComplete(a) {}});Copy the code
7.3 the results
8.repeat()
8.1 What do you do?
The observed event is repeatedly sent, and times is the number of times.
8.2 How to use it?
Observable.just(1.2.3)
.repeat(2)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i("lybj", integer+""); }});Copy the code
8.3 the results
9.subscribeOn() & observeOn()
What does 9.1 do?
SubscribeOn (): Specifies the observed thread. If this method is called multiple times, it is valid only the first time. ObserveOn (): Specifies the observer’s thread
9.2 How can I Use it?
Observable.create(new ObservableOnSubscribe<String>() {
public void subscribe(@NonNull ObservableEmitter<String> emitter){
emitter.onNext("1");
Log.i("lybj", Thread.currentThread().getName());
}
}).subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i("lybj", s);
Log.i("lybj", Thread.currentThread().getName()); }});Copy the code
9.3 the results
8. Filter operators
1.filter()
1.1 What do you do?
The event is sent if true, otherwise not
1.2 How to use it?
Observable.just(1.2.3.4.5)
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
if(integer > 4) {return true;
}
return false;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i("lybj", integer+""); }});Copy the code
1.3 the results
2.ofType()
2.1 What do you do?
You can filter events that do not match this type
2.2 How to use it?
Observable.just(1.2.3."Xiao Ming"."Little party")
.ofType(String.class)
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i("lybj", s+""); }});Copy the code
2.3 the results
3.skip()
3.1 What do you do?
Certain events are skipped. Count indicates the number of skipped events
3.2 How to use it?
Observable.just(1.2.3.4.5.6.7)
.skip(2)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i("lybj", integer+""); }});Copy the code
3.3 the results
4.distinct()
4.1 What do you do?
Filter repeated events in the event sequence.
4.2 What do you do?
Observable.just(1.2.3.1.4.1.2)
.distinct()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i("lybj", integer+""); }});Copy the code
4.3 the results
5.distinctUntilChanged()
5.1 What do you do?
Filter out consecutive and repeated events
5.2 What do you do?
Observable.just(1.2.3.3.1.5.6)
.distinctUntilChanged()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i("lybj", integer+""); }});Copy the code
5.3 the results
6.take()
6.1 What do you do?
Controls the number of events an observer receives.
6.2 How to Use it?
Observable.just(1.2.3.4.5.6)
.take(3)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i("lybj", integer+""); }});Copy the code
6.3 the results
7.debounce()
7.1 What do you do?
If the interval between two events is less than the set interval, the first event will not be sent to the observer. In simple terms, it is anti-jitter, such as button control and quick click.
7.2 How do I Use it?
Observable.just(1.2.3.4.5)
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) throws Exception {
Thread.sleep(900);
return integer;
}
})
.debounce(1,TimeUnit.SECONDS)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i("lybj", integer+""); }});Copy the code
7.3 the results
8.firstElement() && lastElement() && elementAt()
8.1 What do you do?
FirstElement (): takes the firstElement in the sequence of events.
LastElement (): takes the lastElement in the sequence of events.
ElementAt (): To specify events that are fetched from the sequence of events, but nothing occurs if the input index exceeds the total number of events in the sequence.
8.2 How to use it?
Observable.just(1.2.3.4)
.firstElement()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i("lybj", integer+""); }});Copy the code
8.3 the results
9. Conditional operators
1.all()
1.1 What do you do?
Checks whether the sequence of events all satisfies an event, and returns true if so, and false if not.
1.2 How to use it?
Observable.just(1.2.3.4.5)
.all(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer <= 4;
}
}).subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
Log.i("lybj", aBoolean+""); }});Copy the code
1.3 the results
2.takeWhile() & takeUntil()
2.1 What do you do?
TakeUntil (): From the left, take out the element that meets the condition until the first element that meets the condition terminates filter(): I’m going to pull out all the data that meets the criteria.
2.2 How to use it?
Observable.just(1.2.3.4.5)
.takeWhile(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer < 3;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i("lybj", integer+""); }});Copy the code
2.3 the results
3.skipWhile
3.1 What do you do?
Starting from the left, skip the elements based on the conditions
3.2 How to use it?
Observable.just(1.2.3.4.5.3.2.1.7)
.skipWhile(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer < 3;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i("lybj", integer+""); }});Copy the code
3.3 the end
4.isEmpty() & defaultIfEmpty()
4.1 What do you do?
IsEmpty (): determines whether the sequence of events isEmpty.
DefaultIfEmpty (): This method can be used to send a value if the observer sends only an onComplete() event.
4.2 How to Use it?
Observable.create(new ObservableOnSubscribe<String>() {
public void subscribe(@NonNull ObservableEmitter<String> emitter){
emitter.onComplete();
}
}).isEmpty()
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
Log.i("lybj", aBoolean+""); }});Copy the code
4.3 the results
5.contains()
5.1 What do you do?
Determines whether the sequence of events contains an element, returning true if it does, and false if it does not.
5.2 How do I Use it?
Just (in observables.1.2.3.4.5.6)
.contains(2)
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
Log.i("lybj", aBoolean+""); }});Copy the code
5.3 the results
6.sequenceEqual()
6.1 What do you do?
Checks whether two Observables send the same event.
6.2 How to Use it?
Observable.sequenceEqual(Observable.just("Xiao Ming"."Little party"."Xiao li"),
Observable.just("Xiao Ming"."Little party"."Xiao li"."Zhang"))
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
Log.i("lybj", aBoolean+""); }});Copy the code
6.3 the results
10. The source code
Download the demo