For Android developers, RxJava should be familiar. If readers do not know what RxJava is, recommend two articles to everyone
- RxJava Chinese document
- RxJava for Android developers
Through these two articles, I believe you can have a preliminary understanding of RxJava. This article is not going to introduce the specific use of RxJava, because I think the use of RxJava is written clearly in the document. These belong to the “art” level, and it is “Tao” to understand the principle and core content of RxJava at a higher level. Only the combination of “tao” and “art” can truly master RxJava, this article introduces the observer mode in RxJava.
Observer mode in RxJava
First of all, let’s do a simple definition and summary of RxJava, to explain what RxJava is, quote a sentence from the official document
Rx = Observables + LINQ + Schedulers
Rx is essentially a programming model with the following three characteristics:
- Observables is implemented using the observer mode
- LINQ has LINQ features, most notably the use of Lambda expressions
- Schedulers Thread scheduler
RxJava is essentially the use of The Java language to achieve Rx programming model, through RxJava to solve the Android development callback hell, thread switching, the most important is to make the business more scalable, business code writing more standard. Today we will talk about the first feature of the Rx model – the observer pattern.
To understand the Observer pattern in RxJava, let’s look at a few simple concepts in RxJava:
- Observer/Subscriber
- Observable
Observer: As the name implies, it is an Observer or Subscriber. A subscribes to an event, and A observes an event. A is the observer.
Since an Observable has an observer, it must have an Observable. An Observable is the observed, or, more technically, an Observable.
An Observer subscribes to an Observable, and the Observer responds to the data or data sequence emitted by the Observable. This statement also explains why Rx is called responsive programming.
To give a very simple example of how RxJava can be responsive programming by concatenating these objects,
Suppose we need to subscribe to the weather conditions in place A and notify the corresponding subscribers when the weather changes or the temperature drops in place A.
This is done in RxJava2,
Observable.create(new ObservableOnSubscribe<Weather>() {
@Override
public void subscribe(ObservableEmitter<Weather> observableEmitter) throws Exception {
if(! observableEmitter.isDisposed()) { Weather weather = WeatherHelper.getCityWeather("A");
if(weather ! =null){
observableEmitter.onNext(weather);
} else {
observableEmitter.onError(new Exception("Failed to obtain weather of A"));
}
observableEmitter.onComplete();
}
}
}).map(new Function<Weather, Weather>() {
@Override
public Weather apply(Weather weather) throws Exception {
// All kinds of transformations
return weather;
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe(new Consumer<Weather>() {
@Override
public void accept(Weather weather) throws Exception {
// Handle the callback after successfully fetching weather}},new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
// Handle a callback that successfully failed to get weather}},new Action() {
@Override
public void run(a) throws Exception {
// End the method}});Copy the code
Use input and output streams
The above code omitted the transformation and combination of data operations, do the most basic code analysis. Analysis of the above process found
new ObservableOnSubscribe()
Corresponds to the Observable of an Observable;
new Consumer()
New Consumer() and New Action() are used to handle exceptions and callbacks after a task is completed. A generic approach is used to handle data passing between different observable objects.
Code level analysis of how he grouped objects into observer mode: the following is a rough illustration:
Why does it look like this?
- Observable creates an Observable, ObservableOnSubscribe
- The intermediate observation object data is transformed (omitted in this code), but essentially returns an ObservableOnSubscri object
- SubscribeOn and observeOn perform simple scheduling for threads
- Observable subscribes to Observer
- The observer actively pulls the observable data,
The observer actively pulls observable data, which is analyzed from the source point of view,
@SchedulerSupport("none")
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
this.subscribeActual(observer);
} catch (NullPointerException var4) {
throw var4;
} catch (Throwable var5) {
Exceptions.throwIfFatal(var5);
RxJavaPlugins.onError(var5);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(var5);
thrownpe; }}Copy the code
A().b ().c ().subscribe(XXX); A().b ().c () still returns an Observable and finally executes A subscribe method, as you can see in the source code
subscribeActual(observer);
The subscribe method finally pulls the observable data, that is, the subscribe method that implements the ObservableOnSubscribe.
So far, we have found several unexplained problems:
- An Observable subscribes to an Observer. Shouldn’t the traditional Observer pattern be that the Observer subscribes to an Observable?
- Why should observers actively pull event information instead of observables? Does it violate the definition of the observer model?
Definitions and variations of the observer pattern
At this point it’s important to clarify what the observer model is, is the observer model really as simple as we think it is? Let’s start with the brevity of Wikipedia’s observer model
The observer pattern is one of the software design patterns. In this mode, a target object manages all observer objects that depend on it and actively notifies itself of changes in its state. This is usually done by calling the methods provided by each observer. This mode is commonly used in real-time event processing systems.
Note that the phrase “and actively notifys when its state changes” is the official UML diagram of the design pattern. There is actually another observation pattern within the observer pattern, the pull model:
- Push model, event notification source initiates message push actively, subscriber receives data automatically
- Pull model, subscribers actively pull event notifications
The only difference between the pull model and the push model is that the trigger of notify method depends on the subscriber to initiate the notification, and the event source will push the data. When the notify method is executed, it proactively requests data from the event source:
@Override
public void notify(Subject subject) {
// Get the data from the event source
State state = ((ConcreteSubjectA) subject).getState();
System.out.println(getName() + "Observer status updated to:" + state);
}
Copy the code
This is the implementation of the push model; In Android, a typical example is message promotion. If the App receives messages normally, the message notification is pushed past. If the App is not started, the App will take the initiative to pull unreceived messages from the message server.
Understand the Observer pattern in RxJava
Now that we understand the two basic models of the observer model, let’s understand the previous two questions
- An Observable subscribes to an Observer. Shouldn’t the traditional Observer pattern be that the Observer subscribes to an Observable?
- Why should observers actively pull event information instead of observables? Does it violate the definition of the observer model?
The subscribe method is essentially a register method, so that explains the first problem. An Observable does not subscribe to an Observer, but registers an Observer with an Observable. The subscribe method registers an observer and pulls a message from the event source
@SchedulerSupport("none")
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
this.subscribeActual(observer);
} catch (NullPointerException var4) {
throw var4;
} catch (Throwable var5) {
Exceptions.throwIfFatal(var5);
RxJavaPlugins.onError(var5);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(var5);
thrownpe; }}Copy the code
SubscribeActual pulls messages from the event source, similar to the getState() method mentioned earlier. Instead of returning a value directly, this method returns a Consumer callback.
At this point, the process is clear:
Create an event source (creat method) – transform the data source (Map..) – Subscribe registers an observer and performs a pull operation – – thread transform – – returns the data result via a callback
conclusion
RxJava to understand the observer mode is the most critical, after understanding the observer mode, to RxJava the whole design has the essence of understanding, know what it is more to know why, subscribe method of the two layers of meaning need to be combined with the source code carefully understand, and then in practice continue to deepen the understanding, in order to achieve a comprehensive understanding.