By fhyPayaso, No Dishes Studio
Please indicate the source for reprinting
preface
The most popular android network request framework is RxJava2+Retrofit2+OkHttp3, and Retrofit has been used in previous projects for network requests, which can be easily repackaged. But to address some of the problems with nested network requests, this new project intends to use RxJava in network requests to get a feel for responsive programming.
First of all, I strongly recommend throwing line’s classic blog for Android developers to explain RxJava
Observer model
Now that I’m using RxJava, I have to say a little bit about the Observer pattern, because RxJava, as a tool library, uses an extended form of the Observer pattern.
-
Observer mode: Simply put, the observer mode is that an object A (observer) and an object B (observed) reach A subscription relationship. When object B triggers an event, such as some data changes, object A will be immediately notified and object A will respond after receiving the notification. The advantage of this is that object A does not monitor the state of object B in real time, but only responds when A specific event occurs on object B.
-
Observer Mode in Android: Take an example of the most common observer mode in development, OnClickListener
Button. SetOnClickListener (new View. An OnClickListener () {@ Override public void onClick (View v) {/ / click operation after the events}});Copy the code
In this case, button is the observed, view.onClickListener is the observer, and setOnClickListener() creates a subscription relationship between the two. When the button triggers the clicked event, it notifies the observer, and OnClickListener receives the notification and calls onClick() in its interface in response to the clicked event.
introduce
First, let’s take a look at the official introduction of RxJava on Github:
RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable Sequences.(A library for composing asynchronous, event-based programs by using observable sequences.)
From the introduction we can extract a key word: RxJava is used to make the code simpler, and its simplicity is unique, because the way RxJava is used is based on the chain call of the event stream. This ensures that RxJava keeps the code simple and elegant as the complexity of the program increases. Specific examples will be shown later in conjunction with Retrofit.
Since RxJava is based on an extended observer model, there are also basic concepts of observer, observed, subscription, and event. Here are some examples from Carson_Ho’s blog to explain the concepts and fundamentals.
role | role | analogy |
---|---|---|
Observable | An event producer | The customer |
An Observer | Event consumers who receive events and respond to them | The cook |
The Subscribe (subscription) | Link Observable and Observer together | The waiter |
The Event (Event) | Observable notifies the Observer carrier | dishes |
Basic implementation
-
Dependency adding: remember to add dependencies in gradle (the latest version of RxJava2.0 at the time of publication) before you get started.
The compile "IO. Reactivex. Rxjava2: rxjava: 2.1.1" compile 'IO. Reactivex. Rxjava2: rxandroid: 2.0.1'Copy the code
-
Observable creation:
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() { @Override public void Subscribe (ObservableEmitter<String> e) throws Exception {e.onnext (" event 1"); E.o nNext (" event 2 "); E.o nNext (" event 3 "); e.onComplete(); }});Copy the code
Observable.create()
The basic method for creating an Observablejust()
,from()
And other methods to simplify the operation.ObservableOnSubscribe<>()
Subscribe () : an interface that defines the event to send in subscribe ().ObservableEmitter
This is a new class in RxJava2 that can be interpreted as a transmitter for transmitting dataonNext()
And notify theonComplete()/onError()
.
-
The creation of an Observer:
Observer<String> observer = new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "OnSubscribe: Subscribe "); } @override public void onNext(String s) {log. d(TAG, "onNext: response "+s); } @override public void onError(Throwable e) {log. d(TAG, "onError: execution error "); } @override public void onComplete() {log.d (TAG, "onComplete: done "); }};Copy the code
onNext()
: a common event that can be overridden in response.onError()
: error event, which is called when an exception occurs in event processing in the queue, after which no more events are issued.onComplete()
: Completion event, emitted when all events in the queue have been processed.- In a normal sequence,
onError()
andonComplete()
There should be only one and it is at the end of the event queue, and one should not be called after another.
In addition to Observer, RxJava also has built-in an abstract class: Subscriber that implements Observer. The usage of the two classes is basically the same. The main difference is that two methods are added to Subscriber.
1. OnStart (): this is the method of adding Subscriber. It is called at the beginning of the subscribe but before the event is sent, and can be used to do some preparatory work, such as clearing or resetting data. This is an optional method whose implementation is null by default. Note that onStart() does not apply if there is a requirement for the preparing thread (such as a pop-up dialog showing progress, which must be executed on the main thread), because it is always called on the thread that subscribe occurs, and cannot specify the thread. To do the preparatory work on the specified thread, use the doOnSubscribe() method.
2. Unsubscribe (): This is another interface Subscription method implemented by Subscriber, which is used to unsubscribe. After this method is called, Subscriber will no longer receive events. Typically, before invoking this method, you can use isUnsubscribed() to determine the status. The unsubscribe() method is important because after subscribe(), an Observable holds a Subscriber reference, which runs the risk of leaking memory if it is not released in time. The best rule to follow is to call unsubscribe() in an appropriate place (such as onPause(), onStop(), etc.) as soon as it is no longer in use to remove the reference to avoid memory leaks.
-
We subscribe:
observable.subscribe(observer); Copy the code
-
Chain calls
The above three-step process can be invoked directly through a chain structure, making the code much cleaner.
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) Throws Exception {e.onnext (" event 1"); E.o nNext (" event 2 "); E.o nNext (" event 3 "); e.onComplete(); } }).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe: Make a subscription "); } @override public void onNext(String s) {log. d(TAG, "onNext: response "+s); } @override public void onError(Throwable e) {log. d(TAG, "onError: execution error "); } @override public void onComplete() {log.d (TAG, "onComplete: done "); }});Copy the code
You can view the printed result in logcat
Thread scheduling
In RxJava, we can specify the threads on which events are generated and consumed by the Scheduler in RxJava.
Scheduler
-
RxJava has five built-in schedulers
-
Schedulers.immediate(): Runs directly on the current thread. This is the default Scheduler, but has been removed in RxJava2 to prevent it from being used incorrectly.
-
Schedulers.newthread (): starts a newThread and performs operations on it.
-
Schedulers.io(): The Scheduler used for I/O operations (read and write to files, read and write to databases, network information interaction, etc.). The behavior of IO () is similar to that of newThread(), except that the internal implementation of IO () uses an unlimited pool of threads and can reuse idle threads, so in most cases IO () is more efficient than newThread(). Don’t put your calculations in IO () to avoid creating unnecessary threads.
-
Scheduler putation(): The Scheduler used for calculations, such as graphics calculations. This Scheduler uses a fixed thread pool of CPU cores. Don’t focus on COMPUTATION (), or WAIT time for I/O operations will waste CPU.
-
Schedulers.trampoline(): Mainly used to delay the execution of work tasks. When we want to execute a task on the current thread, but not immediately, we can queue it with.trampoline(), and the trampoline will process its queue and run each task in sequence.
-
-
Android-specific Scheduler
AndroidSchedulers.mainThread()
: The specified operations will be performed in the main thread of Android, such as UI update operations.
Thread control
subscribeOn()
: Specifies the thread in which the event is generated, for examplesubscribeOn(Schedulers.io())
You can specify that the observed network requests, file reads and writes, and so on are placed on the IO thread.observeOn()
: Specifies the thread to consume the event, for exampleobserveOn(AndroidSchedulers.mainThread())
The specifiedSubscriber
The methods in the.
Write two sentences in the subscribe () before subscribeOn (Scheduler. IO ()) and observeOn (AndroidSchedulers. MainThread (), the use of the method is very common, it is suitable for most of the < take data background thread, The main thread displays the program policy of >.
Combining with the Retrofit
RxJava is often used in conjunction with Retrofit, and as the complexity of your application increases, the benefits of RxJava’s simplicity will become apparent. Here are a few examples to get a feel for the magic of the RxJava operator.
1. Use Retrofit alone
-
Start by creating the Service interface, using a login interface as an example
public interface LoginService { @POST("login") Call<ApiResponse> login(@Query("phone") String username, @Query("password") String password); } Copy the code
Where ApiResponse is the return body of a self-defined uniform format
-
Construct the Retrofit and send the request
public void login() { Retrofit retrofit = new Retrofit.Builder() .addConverterFactory(GsonConverterFactory.create()) .baseUrl(Config.APP_SERVER_BASE_URL) .build(); retrofit.create(LoginService.class) .login(phone,password) .enqueue(new Callback<ApiResponse>() { @Override public void onResponse(Call<ApiResponse> call, Response<ApiResponse> Response) {Override public void onFailure(Call<ApiResponse> Call, Throwable t) {// Operation after login failure}}); }Copy the code
2. Combine RxJava
-
Introduce dependencies: Remember to add adaptations to Retrofit for RxJava
The compile 'com. Squareup. Retrofit2: adapter - rxjava2:2.3.0'Copy the code
-
Creating a Service Interface
public interface LoginService { @POST("login") Observable<ApiResponse> rxLogin(@Query("phone") String phone, @Query("password") String password); } Copy the code
-
Construct the Retrofit and send the request
public void rxLogin() { Retrofit retrofit = new Retrofit.Builder() .addConverterFactory(GsonConverterFactory.create()) / / add support for RxJava. AddCallAdapterFactory (RxJava2CallAdapterFactory. The create ()). The baseUrl (Config. APP_SERVER_BASE_URL). The build (); Retrofit.create (loginservice.class).rxlogin (phone,password).subscribeon (schedulers.io ()) // IO threads get network requests ObserveOn (AndroidSchedulers. MainThread ()) / / main thread data update. The subscribe (new Observer < ApiResponse > () {@ Override public void OnSubscribe (Disposable d) {} @override public void onNext(ApiResponse ApiResponse) {Override public void onSubscribe(Disposable d) {} @override public void onNext(ApiResponse ApiResponse) Override public void onComplete() {}}); }Copy the code
But don’t worry, this is just the simplest case. As it gets more complicated, you’ll see the power of RxJava.
FlatMap () resolves nested requests
If you don’t use RxJava, the normal idea would be to nest another layer of network requests in RetroFit’s onResponse callback, like this:
Private void register() {retrofit.create(retrofitservice.class).register(phone,password).enqueue(new) Callback<ApiResponse>() { @Override public void onResponse(Call<ApiResponse> call, Response<ApiResponse> Response) {log. d(TAG, "onResponse: registered successfully "); // Login (); } @override public void onFailure(Call<ApiResponse> Call, Throwable t) {log. d(TAG, "onFailure: registration failed "); }}); } private void login() {retrofit.create(retrofitservice.class).login(phone,password).enqueue(new) Callback<ApiResponse>() { @Override public void onResponse(Call<ApiResponse> call, Response<ApiResponse> Response) {log. d(TAG, "onResponse: login succeeded "); } @override public void onFailure(Call<ApiResponse> Call, Throwable t) {log. d(TAG, "onFailure: login failed "); }}); }Copy the code
In fact, this kind of nesting is exactly what we need to avoid. In RxJava, this nesting can be avoided by using the flatMap() operator, which returns an Observable wrapper for the incoming object in the previous data stream. This linearizes the two-dimensional nesting process. Here’s an original example:
Private void loginAndRegister() {// Obtain the observed entity RetrofitService service = retrofit.create(retrofitService.class); Observable<ApiResponse> register = service.register(phone, password); final Observable<ApiResponse> login = service.login(phone, password); Register. SubscribeOn (Schedulers. IO ()) / / (registered by switch to the observer) requests for registration network IO thread. ObserveOn (AndroidSchedulers. MainThread ()) DoOnEach (new Observer<ApiResponse>() {@override public void onSubscribe(Disposable) {} @override public void onNext(ApiResponse ApiResponse) {log. d(TAG, "registered successfully "); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }) // Switch to the IO thread to initiate the login network request. ObserveOn (schedulers.io ()). FlatMap (new Function<ApiResponse, ObservableSource<ApiResponse>>() { @Override public ObservableSource<ApiResponse> apply(ApiResponse apiResponse) throws Exception { return login; }}) / / (the Observer) cut back to the main thread to handle the callback. ObserveOn (AndroidSchedulers. MainThread ()). The subscribe (new Observer < ApiResponse > () {@ Override public void onSubscribe(Disposable d) { } @Override public void onNext(ApiResponse apiResponse) { Log.d(TAG, "onNext: Login successful "); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); }Copy the code
Maybe some of the older guys think, although this is a chain call, no nesting, but this code is too long, and rewrite a lot of useless methods, don’t worry, here is an elegant version
private void loginAndRegister() { final RetrofitService service = retrofit.create(RetrofitService.class); service.register(phone, password) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .doOnNext(new Consumer<ApiResponse>() {@override public void Accept (ApiResponse ApiResponse) throws Exception {log. d(TAG, "registered successfully "); } }) .observeOn(Schedulers.io()) .flatMap(new Function<ApiResponse, ObservableSource<ApiResponse>>() { @Override public ObservableSource<ApiResponse> apply(ApiResponse apiResponse) throws Exception { return service.login(phone, password); } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<ApiResponse>() { @Override public void Accept (ApiResponse ApiResponse) throws Exception {log. d(TAG, "Login succeeded "); }}); }Copy the code
Consumer is a simple Observer with multiple overloads that allow you to customize the information you need to process. In this case, the method is called to accept only onNext messages. It only provides a callback interface, accept, because there is no onError or onCompete. It is not possible to implement the function callback after receiving onError or onCompete, but it does not mean that it will not receive onCompete or onError. It is recommended that you encapsulate the Observer to make it easier to use.
In logcat, you can see the printed log of the successful request
Reference article:
- RxJava for Android developers
- RxJava from the entry to abandon to never abandon
- RxJava + Retrofit completes network requests
- Take you hand in hand into the mystery of Rxjava
- Gracefully implement nested callback for network requests