RxJava is the official documentation of the branch of ReactiveX (a programming interface, it is a programming idea) in card form and chain transfer
The origin of
In Android development, there are a lot of callback processing, and multi-condition interaction, asynchronous call and other related processing, in order to increase the code cleanliness and readability so the project use RXJava
Rx mode
Using observer mode
- Create: Rx makes it easy to create event streams and data streams
- Composition: Rx combines and transforms data streams using query-like operators
- Listen: Rx can subscribe to any observable data stream and perform operations
Simplify the code
- Functional style: The use of side-effect-free input/output functions for observable data streams avoids the intricacies of state in the program
- Simplified code: Rx often works to simplify complex problems into a few lines of code
- Asynchronous error handling: Traditional try/catch cannot handle asynchronous computations, and Rx provides an appropriate error handling mechanism
- Easy concurrency: Rx’s Observables and Schedulers allow developers to get rid of underlying thread synchronization and concurrency issues
Members of the
Observable: Observable
In ReactiveX, an Observer subscribes to an Observable. An observer responds to data, or a sequence of data, emitted by an Observable. This mode greatly simplifies concurrency because it creates an observer sentinel that is on standby to respond to an Observable’s notification at some point in the future, without blocking and waiting for the Observable to emit data
Create: Originator of the creation of soures resource events
Sample code:
// 创建一个可被观察的 Observable
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> observer) {
try {
if(! observer.isUnsubscribed()) {for (int i = 1; i < 5; i++) { observer.onNext(i); } observer.onCompleted(); }}catch(Exception e) { observer.onError(e); }}//subscribe Subscribes to the Subscriber
} ).subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onCompleted(a) {
System.out.println("Sequence complete."); }});Copy the code
The Subscriber: observer
Observable: data emitted by an Observable that listens on data emitted by multiple sourses:
Callback methods (onNext, onCompleted, onError)
The Subscribe method is used to connect an observer to an Observable. Your observer needs to implement a subset of the following methods:
-
onNext(T item)
The Observable calls this method to emit data, and its parameters are the data that the Observable emits. This method can be called multiple times, depending on your implementation.
-
onError(Exception ex)
This method is called when an Observable encounters an error or fails to return the expected data. The call terminates the Observable, and onNext and onCompleted are not called. The arguments to the onError method are thrown exceptions.
-
onComplete
Observable calls this method after the last onNext call if no errors are encountered.
According to the Observable protocol, onNext may be called zero or many times, and there is an onCompleted or onError call (not at the same time). Passing data to onNext is usually called an emission. OnCompleted and onError are called notifications.
Operator: Wraps data logic
Oncreate, map, and flatMap are all operators, and RxJava has a number of operators available:
use
Retrofit is used together, asynchronous and multi-logical operation
public void request2(View view) {
/** ** one line of code to implement requirements * requirements: * and pop-up loading ** 1. Request server registration operation * * 2. After registration, update the registration UI * * 3. 4. After login, update the login UI */
MyRetrofit.createRetrofit().create(IReqeustNetwor.class)
.registerAction(new RegisterRequest()) // todo 1. Request server to register operation // todo 2
.subscribeOn(Schedulers.io()) // give the above async
.observeOn(AndroidSchedulers.mainThread()) // assign the main thread below
.doOnNext(new Consumer<RegisterResponse>() { // todo 3
@Override
public void accept(RegisterResponse registerResponse) throws Exception {
// todo 2. Update the registration UI after the registration is complete}})// todo 3. Log in to the server immediately
.observeOn(Schedulers.io()) // assign the following asynchronous threads
.flatMap(new Function<RegisterResponse, ObservableSource<LoginResponse>>() { // todo 4
@Override
public ObservableSource<LoginResponse> apply(RegisterResponse registerResponse) throws Exception {
Observable<LoginResponse> loginResponseObservable = MyRetrofit.createRetrofit().create(IReqeustNetwor.class)
.loginAction(new LoginReqeust());
return loginResponseObservable;
}
})
.observeOn(AndroidSchedulers.mainThread()) // Execute the main thread below
.subscribe(new Observer<LoginResponse>() {
// It must be the main thread, why, because subscribe immediately call onSubscribe
@Override
public void onSubscribe(Disposable d) {
// TODO 1
progressDialog = new ProgressDialog(RequestActivity.this);
progressDialog.show();
/ / UI operations
disposable = d;
}
@Override
public void onNext(LoginResponse loginResponse) { // todo 5
// TODO 4. After login, update the login UI
}
@Override
public void onError(Throwable e) {}// todo 6
@Override
public void onComplete(a) {
/ / editing
if(progressDialog ! =null) { progressDialog.dismiss(); }}}); }@Override
protected void onDestroy(a) {
super.onDestroy();
// It must be written like this, the minimum standard
if(disposable ! =null)
if(! disposable.isDisposed()) disposable.dispose(); }Copy the code