This is the fifth day of my participation in the November Gwen Challenge. Check out the details: The last Gwen Challenge 2021
Because it is my spare time to write out some opinions, the length is long, so it is divided into several parts published, you can see my home page to get the PDF version
preface
What exactly is RxJava?
Given the current hot and mysterious state of RxJava, and the fact that I’ve been using it for a year,
I decided to write this article to provide a relatively detailed introduction to RxJava for Android developers.
My purpose is twofold:
For those interested in RxJava, a guide to getting started
For those of you who are using RxJava but still wondering, a little more in-depth parsing
At the end of the text before we start, put the GitHub link and gradle code to introduce dependencies:
Github:github.com/ReactiveX/R… Github.com/ReactiveX/R…
Introducing dependencies:
The compile 'IO. Reactivex: rxjava: 1.0.14' ` compile 'IO. Reactivex: rxandroid:' 1.0.1 `Copy the code
What exactly is RxJava
In a word: asynchronous.
RxJava’s description on the GitHub page reads “A library for composing Asynchronous and Event-based
Programs using Observable sequences for the Java VM” programs using observable sequences on the Java VM
A library for asynchronous, event-based programs. This is RxJava, and it’s summed up very precisely.
However, for beginners, this is too hard to understand. Because it is a “summary”, and beginners need an “introduction”.
In fact, the essence of RxJava can be reduced to the word asynchronous. At the root, it is a library that implements asynchronous operations, and nothing else
It’s all based on that.
Where is good RxJava
In other words, “Why do people use AsyncTask/Handler/XXX /… when it’s done asynchronously?”
In one word: simplicity.
A key aspect of asynchronous operations is program brevity, because asynchronous code is often difficult to write in complex scheduling situations
It’s also hard to read. Android created AsyncTask and Handler to make asynchronous code simpler. RxJava
The advantage is simplicity, but its simplicity is distinctive in that it can remain simple as the program logic becomes more and more complex.
Suppose you have a requirement that you have a custom view, imageCollectorView, that displays multiple images
Image, and can use addImage(Bitmap) method to arbitrarily increase the display of images. Now we need the program to return a given number of directories
Group File[] folders PNG images in each directory are loaded and displayed in imageCollectorView. Need to note
Since the process of reading the image is time-consuming, it needs to be performed in the background, whereas the display of the image must be performed in the UI thread.
There are several common implementations, one of which I have posted here:
new Thread() {
@Override
public void run() {
super.run();
for (File folder : folders) {
File[] files = folder.listFiles();
for (File file : files) {
if (file.getName().endsWith(".png")) {
final Bitmap bitmap = getBitmapFromFile(file);
getActivity().runOnUiThread(new Runnable() {
@Override
public void run(){ imageCollectorView.addImage(bitmap); }}); } } } } }.start();Copy the code
With RxJava, the implementation looks like this:
Observable.from(folders)
.flatMap(new Func1<File, Observable<File>>() {
@Override
public Observable<File> call(File file) {
return Observable.from(file.listFiles());
}
})
.filter(new Func1<File, Boolean>() {
@Override
public Boolean call(File file) {
return file.getName().endsWith(".png");
}
})
.map(new Func1<File, Bitmap>() {
@Override
public Bitmap call(File file) {
return getBitmapFromFile(file);
}})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap){ imageCollectorView.addImage(bitmap); }});Copy the code
That talk: “you this code obviously changed much! Concise hair!” Calm down, brother. I’m talking about logical simplicity,
It’s not just a small amount of code (logical simplicity is the key to faster code reading and writing, right?). .
If you look at this implementation of RxJava, it’s a chain call from top to bottom, without any nesting, which is logical
Is an advantage in simplicity.
This advantage becomes even more apparent when the requirements become more complex (imagine the general approach if only the top 10 images are required
To do? What if there were more requirements? And imagine that after a couple of months of implementing this whole bunch of requirements and having to change features, when you go back
By seeing the indenting of the puzzle that you wrote down, you can be sure that you will be able to read it quickly rather than trying to think it over again
The road? .
In addition, if your IDE is Android Studio, you will actually see it automatically every time you open a Java file
Lambdazed preview, which will give you a clearer view of the program logic:
Observable.from(folders) .flatMap((Func1) (folder) -> { Observable.from(file.listFiles()) }) .filter((Func1) (file) -> { file.getName().endsWith(".png") })
.map((Func1) (file) -> { getBitmapFromFile(file) })
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe((Action1) (bitmap) -> { imageCollectorView.addImage(bitmap) });
Copy the code
If you’re comfortable with Retrolambda, you can just write the code in the terse form above. And if you look at this
I don’t know what Retrolambda is yet, and I don’t recommend learning about it right now.
There are two reasons:
Lambda is a double-edged sword that makes your code less readable while keeping it clean, so learning RxJava and Retrolambda at the same time may cause you to overlook some of the technical details of RxJava;
Retrolambda is an unofficial Java 6/7 compatibility scheme for Lambda expressions, and its backward compatibility and stability are not guaranteed, so using Retrolambda is risky for enterprise projects. So, unlike many RxJava promoters, I don’t recommend learning Retrolambda along with RxJava.
In fact, as much as I personally admire Retrolambda, I’ve never used it.
In Flipboard’s Android code, there is a piece of logic that is very complicated, including multiple memory operations, local file operations, and network operations
For, objects are divided and closed, threads cooperate with each other and wait for each other, while row adult words, while line up a word. If you use the conventional square
To do this, it would be a hell of a lot to write, but with RxJava, it’s still just a chain call. it
It’s long, but clear.
So what’s so good about RxJava? It’s the simplicity, the simplicity that threads any complex logic into a single line.
What would you consider if you were designing Rxjava
For example,
- How do I set the observer mode for each task, and how do I inform the final viewer of anything that might happen in the middle of the lecture
The observer, the ultimate observer, just needs to get results, even if they are failures or successes
-
Should the chain of responsibility model be implemented with the builder model
-
How can the responsibility chain model also infinitely increase the introduction and principle of the observed and the observer API
I can’t say that in one word… Because the main content of this section is to explain step by step exactly how RxJava does things differently
Step, how to achieve concise.
API introduction and principle analysis
1. Concept: Extended observer mode
The asynchronous implementation of RxJava is implemented through an extended observer pattern.
Observer model
Let’s start with a brief description of the observer pattern. You can skip this section if you’re already familiar with it.
The observer mode is oriented to the requirement that object A (observer) is highly sensitive to A certain change of object B (observed) and needs to change in B
The moment to react. For example, in the news, the police catch a thief, and the police need to do it when the thief has his hand
Catch. In this case, the policeman is the observer, the thief is the observed, and the police need to keep an eye on the thief’s every move to ensure that
No moment will be lost. The program’s observer mode is slightly different from this real “observation” in that the observer does not have to stare at the person being observed at all times
(for example, A does not need to check B’s status every 2ms), but registers or subscribes
Subscribe, which tells the observed: I need your xyz state, and you need to notify me when it changes. The Android open
A typical example of this is the OnClickListener. For setting up OnClickListener, View
The setOnClickListener() method is used to subscribe to the observed and OnClickListener is the observer.
The moment a user clicks a button after subscribes, the Android Framework sends the click event to the registered user
An OnClickListener. Adopting such passive observation mode, not only saves the resource consumption of repeated retrieval state, but also can get the highest
Feedback speed. Of course, this also benefits from the fact that we can customize the observer and observed in our own program, while the police obviously do not
The law requires a thief to “let me know when you commit a crime.”
The OnClickListener mode looks like this:
As shown, Button holds a reference to OnClickListener through the setOnClickListener() method
Not drawn on the graph); Button automatically calls the onClick() method of OnClickListener when the user clicks. On the other
Button -> Observed, OnClickListener -> Observer,
SetOnClickListener () -> Subscribe, onClick() -> Event), is used by dedicated observer mode (for example, only for listener control
Switch to the general observer mode. The diagram below:
RxJava, as a tool library, uses a generic form of the Observer pattern.
RxJava observer mode
RxJava has four basic concepts: Observable, Observer, and subscribe
(Subscription), events. Observables and observers subscribe through the subscribe() method, thus observables
Events can be issued to notify the Observer as needed.
Unlike traditional observer mode, RxJava’s event callback method has the exception of the normal event onNext() (equivalent to onClick() /
In addition to onEvent()), two special events are defined: onCompleted() and onError().
OnCompleted () : The event queue is completed. RxJava not only treats each event individually, but also treats them as a queue.
RxJava specifies that the onCompleted() method needs to be fired as a signal when no new onNext() will be issued.
OnError () : The event queue is abnormal. When an exception occurs during event processing, onError() is raised and the queue is automatic
Terminate, no more events are allowed to be emitted.
In a properly run sequence of events, there is one and only onCompleted() and onError(), and they are in the sequence of events
The last one. Note that onCompleted() and onError() are also mutually exclusive, that is, called from a queue
Once you have one, you should not call the other.
The observer mode for RxJava looks like this:
2. Basic implementation
Based on the above concepts, the basic implementation of RxJava has three main points:
1) create the Observer
An Observer is an Observer that determines how events will behave when triggered. An implementation of the Observer interface in RxJava
Type:
Observer<String> observer = new Observer<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "Error!"); }};Copy the code
In addition to the Observer interface, RxJava also comes with an abstract class that implements the Observer: Subscriber.
Subscriber makes some extensions to the Observer interface, but their basic usage is exactly the same:
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "Error!"); }};Copy the code
Not only is the basic usage the same, but in essence, the Observer is always converted to one first during the SUBSCRIBE process in RxJava
Subscriber re-use. So if you only want to use basic functions, it is exactly the same to select Observer and Subscriber
. There are two main differences between them for users:
- OnStart () : This is the method for adding Subscriber. It will be tuned just before the subscribe starts and before the event is sent
Can be used to do some preparatory work, such as data clearing or reset. This is an optional method that is implemented by default
Is empty. It is important to note that if there is a request for the preparation thread (such as a pop-up dialog showing progress), this must be done in
The main thread executes), onStart() does not apply, because it is always called when subscribe occurs in the thread, and cannot
Specify the thread. To do the preparatory work on the specified thread, use the doOnSubscribe() method, which is detailed in the following section
As seen in the article.
- Unsubscribe () : This is another interface Subscription method implemented by Subscriber, which is used to unsubscribe
Reading. After this method is called, Subscriber will no longer receive events. It can be used before this method is called
To judge the status, isUnsubscribed(). Unsubscribe () this method is important because in subscribe()
The Observable then holds the Subscriber reference, which will leak memory if it is not released in time
The risk. So it’s best to stick to the principle that it should be in place as soon as possible when it’s no longer in use(for example onPause())
Unsubscribe () is called in onStop() to unsubscribe() to avoid memory leaks.
2) create observables
An Observable, or Observable, decides when and what events are triggered. RxJava uses the create() method
Create an Observable and define event-triggering rules for it:
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("Hi");
subscriber.onNext("Aloha"); subscriber.onCompleted(); }});Copy the code
As you can see, an OnSubscribe object is passed in as an argument. OnSubscribe will be stored in the returned
An Observable acts as a schedule, OnSubscribe, when an Observable is subscribed
The call() method is automatically called, and the sequence of events is fired in the right order (in the case of the above code, the observer)
Subscriber will be called onNext() three times and onCompleted() once. Thus, the observer is called by the observed
The observed to observer event transfer is realized, that is, the observer mode
The example is simple: the content of the event is a string, not some complex object; The content of the event is already set, and
Unlike some observer patterns that are to be determined (e.g., the result of a network request is unknown until the request is returned); All events
All at once, rather than interspersed with certain or uncertain intervals or triggered by some kind of trigger
. In short, this example seems to be of no practical value. But this is just for the sake of illustration, essentially you can do all sorts of things if you want
You can write your own rules. We’ll talk about how to do that later, but not right now. Only put the fundamentals first
Having said that, it is easier to say that the upper level of the application of talent.
The create() method is RxJava’s most basic method for creating a sequence of events. Based on this approach, RxJava also provides several methods to use
To quickly create an event queue, for example:
just(T…) : Sends the parameters passed in sequence.
Observable observable = Observable.just("Hello"."Hi"."Aloha");
// Will call:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
Copy the code
- from(T[]) / from(Iterable
) : Breaks an array or Iterable passed into concrete objects and sends them one by one.
String[] words = {"Hello", "Hi", "Aloha"}; Observable observable = Observable.from(words); // will call: // onNext("Hello"); // onNext("Hi"); // onNext("Aloha"); // onCompleted();Copy the code
Just above the (T)… The create(OnSubscribe) example and the from(T[]) example are both equivalent to the previous create(OnSubscribe) example.
3) Subscribe
After creating observables and observers, we link them together with the subscribe() method
The work. The code form is simple:
observable.subscribe(observer);
/ / or:
observable.subscribe(subscriber);
Copy the code
One might notice that the subscribe() method is a bit odd: it looks like “Observalbe subscribed
Observer/subscriber “instead of” Observer/subscriber subscribed to observalbe “, this looks like
It’s like “a magazine subscribes to a reader.” It’s a little bit awkward to read, but if you design the API
Subscribe (Observable)/subscriber.subscribe(observable), although more effective
Logical, but it has an impact on the design of streaming apis, which is obviously more than worth the loss.
The internal implementation of Observable.subscribe(Subscriber) looks like this (core code only) :
Observable observable = Observable.just(“Hello”, “Hi”, “Aloha”);
// Will call:
// onNext(“Hello”);
// onNext(“Hi”);
// onNext(“Aloha”);
// onCompleted();
String[] words = {“Hello”, “Hi”, “Aloha”};
Observable observable = Observable.from(words);
// Will call:
// onNext(“Hello”);
// onNext(“Hi”);
// onNext(“Aloha”);
// onCompleted();
observable.subscribe(observer);
/ / or:
observable.subscribe(subscriber); Subscribe (); subscribe(); subscribe();
// If you need to see the source code, you can go to RxJava GitHub repository to download.
public Subscription subscribe(Subscriber subscriber) {
subscriber.onStart();
onSubscribe.call(subscriber);
return subscriber;
}
Copy the code
As you can see, subscriber() does three things:
-
The calling Subscriber. OnStart (). This method, described earlier, is an optional preparation method.
-
Call OnSubscribe. Call (Subscriber) in Observable. At this point, the event-sending logic starts running
Line. As you can see, in RxJava, an Observable doesn’t start sending events immediately upon creation
It is when it is subscribed, when the subscribe() method is executed.
- Incoming Subscriber is returned as Subscription. This is to facilitate unsubscribe().
The relationship between objects in the whole process is shown as follows:
In addition to SUBSCRIBE (Observer) and subscribe(Subscriber), subscribe() supports incomplete defined callbacks, and RxJava automatically creates Subscriber based on the definition. The form is as follows:
Action1<String> onNextAction = new Action1<String> () {// onNext()
@Override
public void call(String s){ Log.d(tag, s); }}; Action1<Throwable> onErrorAction =new Action1<Throwable>() {
// onError()
@Override
public void call(Throwable throwable) {
// Error handling}}; Action0 onCompletedAction =new Action0() {
// onCompleted()
@Override
public void call() {
Log.d(tag, "completed"); }};// Automatically create Subscriber and use onNextAction to define onNext()
observable.subscribe(onNextAction);
// Automatically create Subscriber and define onNext() and onErrorAction using onNextAction and onErrorAction
onError()
observable.subscribe(onNextAction, onErrorAction);
// Automatically create Subscriber and use onNextAction, onErrorAction and onCompletedActionDefine onNext(), onError(), and onCompleted() Observable. subscribe(onNextAction, onErrorAction, onCompletedAction);Copy the code
Briefly explain the Action1 and Action0 that appear in this code. Action0 is an RxJava interface that has only one
Call (), which takes no arguments and returns no value; Since the onCompleted() method also takes no parameters and returns no value
Action0 can be used as a wrapper object, wrapping up the contents of onCompleted() and passing itself in as a parameter
Subscribe () to implement an incompletely defined callback. This can also be seen as passing the onCompleted() method as an argument
Subscribe (), equivalent to “closure” in some other languages. Action1 is also an interface, which again has only one method
Call (T param), this method also returns no value, but takes one argument; As with Action0, since onNext(T obj) and
OnError (Throwable error) is also a single argument with no return value, so Action1 can replace onNext(obj) with
OnError (error) is packaged and passed subscribe() to implement an incompletely defined callback. In fact, although Action0 and
Action1 is the most widely used API, but RxJava provides multiple ActionX-like interfaces (e.g. Action2,
Action3), which can be used to wrap different methods with no return value
Note: As mentioned earlier, Observer and Subscriber have the same role and Observer is in
The subscribe() process is eventually converted to a Subscriber object, so from here on, the following description I will
Use Subscriber instead of Observer for more rigor.
4) Scenario examples
Here are two examples:
In order to express the principle in a clearer way, the examples selected in this article are as simple as possible, so that some examples
The code will look like “gild the lily” and “solve the problem much more easily without RxJava”. When you see this
In this case, it is not because RxJava is too verbose, but because it is not helpful to give examples of real world scenarios too early in the process
So I deliberately chose simple situations.
A. Print an array of strings
Print all the strings in the string array NAMES in turn:
String[] names = ... ; Observable.from(names) .subscribe(new Action1<String>() {
@Override
public void call(String name){ Log.d(tag, name); }});Copy the code
B. Obtain the picture by id and display it
Retrieves the image from the specified drawable file id drawableRes and displays the image in the ImageView
Error: toasttoast
int drawableRes = ... ; ImageView imageView = ... ; Observable.create(new OnSubscribe<Drawable>() {
@Override
public void call(Subscriber<? super Drawable> subscriber) {
Drawable drawable = getTheme().getDrawable(drawableRes));
subscriber.onNext(drawable);
subscriber.onCompleted();
}
}).subscribe(new Observer<Drawable>() {
@Override
public void onNext(Drawable drawable) {
imageView.setImageDrawable(drawable);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show(); }});Copy the code
As in the two examples above, create an Observable and Subscriber and connect them with subscribe().
One basic use of RxJava is complete. Very simple.
However,
By default in RxJava, events are emitted and consumed in the same thread. In other words, if you just use the method above, real
What emerges is a synchronous observer pattern. The observer mode itself is intended to be a “background processing, foreground callback” asynchronous machine
So asynchrony is critical to RxJava. To implement asynchrony, we need to use another RxJava concept:
The Scheduler.