Introduction to the
What is RxJava? RxJava is a Java VM implementation of ReactiveX (Reactive Extensions) : a library for writing asynchronous and event-based programs using observable sequences.
In my understanding, RxJava can mainly realize the functions of asynchronous tasks and event bus, which is also the strength of RxJava.
GitHub address for RxJava
Github.com/ReactiveX/R…
use
This article will go into more detail about the use of RxJava, and will focus on the use of RxJava.
create
There are two ways to create an Observer: create Observer or Subscriber.
The Observer:
Observer<String> observer = new Observer<String>() {
@Override
public void onError(Throwable e) {
Log.i("test"."onError");
}
@Override
public void onComplete() {
Log.i("test"."onComplete");
}
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.i("test"."onSubscribe");
}
@Override
public void onNext(String s) {
Log.i("test"."onNext----->"+ s); }};Copy the code
The Subscriber:
Subscriber subscriber = new Subscriber() {
@Override
public void onSubscribe(Subscription s) {
Log.i("test"."onSubscribe");
}
@Override
public void onNext(Object o) {
Log.i("test"."onNext");
}
@Override
public void onError(Throwable t) {
Log.i("test"."onError");
}
@Override
public void onComplete() {
Log.i("test"."onComplete"); }}Copy the code
Create an Observable and subscribe to it. In RxJava, the Observable subscribes using the Subscribe method.
The observed can be created in three ways:
1) Use Observable.create
Observable.create(new ObservableOnSubscribe<String>(){
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("Hello"); } }).subscribe(observer); / / subscribeCopy the code
In the subscribe method, the Observer onNext, onError, and onComplete methods are called manually, while the onSubscribe method is called automatically.
2) Observable. Just You can create an Observable using the Observable. Just method
Observable.just("Hello"."hello world").subscribe(observer);
Copy the code
Create a subscribe with Observable.just, which automatically calls onSubscribe, onNext, onError, and onComplete.
3) observables. FromArray
Create an Observable using observable. fromArray.
String[] quotations = {"Love my country."."Love the people"};
Observable.fromArray(quotations).subscribe(observer);
Copy the code
Create with Observables. FromArray, subscribe, and, like Observables. Just, automatically invoke the observer’s methods.
Observer method
Above we have created an observation, creating an observer with four methods: onError, onComplete, onSubscribe, and onNext.
So what do these methods mean?
OnSubscribe: This method is triggered when the observed subscribes to an observer.
OnCompleted: The event queue is completed. RxJava not only treats each event individually, but also treats them as a queue. RxJava specifies that the onCompleted method needs to be fired as a signal when no new onNext will be issued.
OnError: The event queue is abnormal. When an exception occurs during event processing, the onError method is raised and the queue terminates automatically. No more events are allowed to be emitted.
OnNext: represents a common event in which you can do some business logic.
The operator
Operators include plain operators, transform operators, filter operators, combinative operators, auxiliary operators, error handlers, conditional operators, Boolean operators, and conversion operators.
Common operators include interval, repeat, intervalRange, and so on.
Use, such as:
Observable. IntervalRange (0,6,0,3, timeunit.seconds). Create (new ObservableOnSubscribe<String>() {@override public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception { e.onNext("666");
}
}).subscribe(observer);
Copy the code
IntervalRange This operator is used to delay execution and execute periodically.
Transformation operators include map, flatMap, cast, concatMap, and so on.
Map: Specifies a Function object that converts an Observable into a new Observable and fires it.
Observable.just("Hello"."hello world").map(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {
return s;
}
}).subscribe(observer);
Copy the code
FlatMap, cast:
Observable.just("Hello"."hello world").flatMap(new Function<String, ObservableSource<? >>() { @Override public ObservableSource<? > apply(@NonNull String s) throws Exception {return Observable.just(s);
}
}).cast(String.class).subscribe(observer);
Copy the code
FlatMap transforms the data set emitted by an Observable into a collection of Observables, and then flatly puts the data emitted by these Observables into a single Observable, while CAST forces all data emitted by an Observable to be converted to a specified type.
Buffer operator functions:
1, can set multiple results to the list at one time, after subscription automatically empty the corresponding results, until completely cleared
2. You can also periodically collect multiple results to the list, and automatically empty the corresponding results after subscription until they are completely cleared
Observable
.range(0,5)
.buffer(2)
.subscribe(new Observer<List<Integer>>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull List<Integer> integers) {
Log.i("test"."----------------->onNext:" + integers);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {}});Copy the code
Observable
.just("Hello"."hello world"."I love my family.")
.buffer(3)
.subscribe(new Observer<List<String>>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull List<String> strings) {
Log.i("test".""+strings);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {}});Copy the code
In addition to the above transformation operators, there is also the groupBy operator for grouping operations.
Filter operator
Filter operators include filter, skip, take, Element, and so on.
Filter: Filters the custom result rules generated by an Observable. Only results that meet the conditions are submitted to subscribers.
Observable
.just("Hello"."hello world"."I love my family.")
.filter(new Predicate<String>() {
@Override
public boolean test(@NonNull String s) throws Exception {
Log.i("test".""+s);
return s.equals("Hello");
}
}).subscribe(observer);
Copy the code
Distinct: go to heavy
Observable
.just("Hello"."hello world"."I love my family."."I love my family.")
.distinct()
.subscribe(observer);
Copy the code
Skip: Filter out the first n items
Observable
.just("Hello"."hello world"."I love my family."."I love my family.")
.skip(2)
.subscribe(observer);
Copy the code
Take: take the first n terms
Observable
.just("Hello"."hello world"."I love my family."."I love my family.")
.take(2)
.subscribe(observer);
Copy the code
ThrottleWithTimeout: If the source Observable emits new data within a certain amount of time, the data is discarded and the throttleWithTimeout restarts the timer. If it emits data before the timer ends every time, the limit goes to an extreme (only emits the last data).
Observable
.just("Hello"."hello world"."I love my family."."I love my family.")
.throttleWithTimeout(200, TimeUnit.MILLISECONDS)
.subscribe(observer);
Copy the code
Combinatorial operator
Combined operators include merge, startWidth, concat, jion, Switch, zip, and more.
Merge: Merges multiple Observables into one Observable for launching.
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.i("test"."onSubscribe");
}
@Override
public void onNext(@NonNull String s) {
Log.i("test"."onNext--->" + s);
}
@Override
public void onError(@NonNull Throwable e) {
Log.i("test"."onError");
}
@Override
public void onComplete() {
Log.i("test"."onComplete"); }}; Observable<String> observable1 = Observable.just("Hello"."hello World");
Observable<String> observable2 = Observable.just("new obj"."mergeobj");
Observable.merge(observable1, observable2).subscribe(observer);
Copy the code
Concat: merges and transmits data emitted by multiple Observables in a strict order, which has the characteristics of queues. The first data will not be launched until the first data is launched.
Observable<String> observable1 = Observable.just("Hello"."hello World");
Observable<String> observable2 = Observable.just("new obj"."mergeobj");
Observable.concat(observable1, observable2).subscribe(observer);
Copy the code
In addition to the above combined operators, there are zip, combineLastest, etc.
Zip: combines two or more data items emitted by Obserable, transforms them according to the specified function, and emits a new value.
Auxiliary operator
Auxiliary operators include DO, Delay, observeOn, timeout, timeInterval, timestamp, subscribeOn, meterialize and TO.
Delay: delays transmitting data
Observable<String> observable1 = Observable.just("Hello"."hello World");
Observable<String> observable2 = Observable.just("new obj"."mergeobj");
Observable.concat(observable1, observable2).delay(5, TimeUnit.SECONDS).subscribe(observer);
Copy the code
SubscribeOn: Specifies that the Obserable itself runs on that thread.
ObserveOn: Specifies the thread in which the data emitted by Obserable is run.
Other operators are left to the reader.
Error operator
In RXJava, error operators include Catch and retry.
Catch intercepts the original Observable’s onError notification and replaces it with other data items or data sequences, allowing the resulting Observable to terminate normally or not at all. The catch implementation is divided into three distinct operators:
1, onErrorReturn: The standby Observable returns the behavior of the original Observable. The standby Observable ignores the onError call of the original Observable, that is, does not pass the error to the observer, but emits a special item and calls the observer’s onCompleted.
OnErrorResumeNext: Returns the standby Observable like onErrorReturn. It will not call onError of the original Observable. It will send the standby Observable data.
OnExceptionResumeNext: If onError receives a Throwable that is not an Exception, it will pass the error to the observer’s onError method without using the alternate Observable.
Retry: Instead of passing onError notifications from the original Observable to the observer, it subscribs to the Observable, giving it another chance to complete its data sequence error-free, and always passes onNext notifications to the observer. This operator has the potential to duplicate data because of re-subscription. If the number of re-subscriptions is exceeded, it will not be re-subscribed and the latest onError notification will be passed to the observer.
Conditional operator
Conditional operators include defaultEmpty, skipUntil, amb, skipWhile, takeUtil, takeWhile
DefaultEmpty: Emits default data if the original Observable does not emit data.
SkipUntil: Subscribes to the original Observable, but ignores its emitters, until the second Observable emits a data item and starts emitting the original Observable.
Boolean operator
Boolean operators include: all, isEmpty, contains, EXISTS, and sequenceEqual.
For more information on conditional and Boolean operators, please refer to RxJava Operators (08- Conditional and Boolean operators).
Blog.csdn.net/xmxkf/artic…
Conversion operator
Transform operators convert an Observable to another object or data structure. Transform operators include toMap, toMultiMap, toList, toSortedList, Nest, and getIterator.
ToMap: Collects all the data items emitted by the original Observable into a Map and emits the Map.
String s1 = "Hello";
String s2 = "hello world";
String s3 = "lalala";
Observable.just(s1,s2,s3).toMap(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {
return s;
}
}).subscribe(new SingleObserver<Map<String, String>>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onSuccess(@NonNull Map<String, String> stringStringMap) {
Log.i("test".""+stringStringMap);
}
@Override
public void onError(@NonNull Throwable e) {
}
});
Copy the code
ToMultiMap: Similar to toMap, except that the value of a map is a collection.
ToList: Compose a List of transmitted data.
String s1 = "Hello";
String s2 = "hello world";
String s3 = "lalala";
Observable.just(s1,s2,s3).toList().subscribe(new SingleObserver<List<String>>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onSuccess(@NonNull List<String> strings) {
Log.i("test".""+strings);
}
@Override
public void onError(@NonNull Throwable e) {
}
});
Copy the code
For other operators, you can refer to the RxJava Operators Guide.
RxJava thread control
Earlier in the tutorial, we learned that using subscribeOn you can specify which thread Obserable itself runs on. Using observeOn, you can specify which thread to run data emitted from Obserable. The RxJava default thread does callbacks on the thread calling the subcribe method, but if you want to switch threads, you need to use Scheduler.
The following schedulers are built into RxJava:
Scheduler.immediate() : runs on the current thread and is the default Scheduler for the timeout, timestamp, and timeInterval operators.
IO () : The Scheduler used for I/O operations.
Scheduler.newthread () : starts a newThread to perform operations.
The difference between 2 and 3 is that 2 internally implements an unlimited number of thread pools and reuses idle threads, so 2 is more efficient.
Scheduler.trampoline() : Tasks can be queued using the trampoline method. This Scheduler processes queued tasks sequentially and is the default Scheduler for repeat and retry operators.
5. Scheduler.computation() : the scheduler used for calculation, which has a fixed thread pool and a number of CPU cores. It is important not to try to computation IO operations, or WAIT time for IO operations will waste CPU. This scheduler is the default scheduler for Buffer, Delay, SAMPLE, Debounce, Interval and Skip.
6, AndroidSchedulers. MainThread () : it means in the main thread, the scheduler is RxAndroid.
Observable.just("Hello"."hello world")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
Copy the code
RxJava, Retrofix, OkHttp, RxJava, Retrofix, OkHttp