Reprint please indicate the source: juejin.cn/user/435372…
Note: RxJava2 has been used by a lot of people. The extended observer mode, concise chain call, through a simple API call can meet our various needs, let me just say that this thing is really cool. Of course, when we use it very cool, can not help but also have some curiosity about it, this thing in the end is what it looks like, well, want to see, let’s see. Spent some time looking at some of its source code to make this note.
primers
Since I’m a little white, what else do I have to pick? I haven’t looked at Flowable (Observable with back pressure mode) yet. Here I just record the reading process of common Observable source code. The following code is known as a primer, and this article will focus on it. Here’s the version I used:
compile 'the IO. Reactivex. Rxjava2: rxjava: 2.1.1'
compile 'the IO. Reactivex. Rxjava2: rxandroid: 2.1.0'
Copy the code
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "Observable emit 1 ");
emitter.onNext(1);
Log.d(TAG, "Observable emit 2");
emitter.onNext(2);
emitter.onComplete();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.d(TAG, "onSubscribe: isDisposable " + d.isDisposed());
}
@Override
public void onNext(@NonNull Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(@NonNull Throwable e) {
Log.d(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: "); }});Copy the code
Key concepts
Observable (Observable) : This is an abstract class with a lot of methods, so it doesn’t list them. Observer: This is an interface that contains four methods that must be known.
public interface Observer<T> {
void onSubscribe(@NonNull Disposable d);
void onNext(@NonNull T t);
void onError(@NonNull Throwable e);
void onComplete();
}
Copy the code
ObservableOnSubscribe (carrier of event emitters) :
public interface ObservableOnSubscribe<T> {
void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}
Copy the code
ObservableEmitter: This is an interface that inherits from the Emitter interface for sending events.
public interface ObservableEmitter<T> extends Emitter<T> {
void setDisposable(@Nullable Disposable d);
void setCancellable(@Nullable Cancellable c);
boolean isDisposed();
@NonNull
ObservableEmitter<T> serialize();
boolean tryOnError(@NonNull Throwable t);
}
public interface Emitter<T> {
void onNext(@NonNull T value);
void onError(@NonNull Throwable error);
void onComplete();
}
Copy the code
These are the key concepts that you have to remember, at least in general terms, to know what they are and what methods are in them.
Two, go straight to the core
Since this is a chain call, we might as well go through it from beginning to end. Observable.create(new ObservableOnSubscribe
(){… }) creates an Observable. Look in the static create() method
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> sourceCode) {/ / to empty, is not important, don't let it be ObjectHelper. RequireNonNull (source."source is null"); // Key code to create an Observablereturn RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
Copy the code
There are only two lines of code in the create() method, so let’s focus on rxJavaplugins.onAssembly (new ObservableCreate
(source)). OnAssembly () and new ObservableCreate
(source)
@SuppressWarnings({ "rawtypes"."unchecked" })
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if(f ! = null) {return apply(f, source); } // The above two lines relate to using the Map operator. We don't use the map operator yet, so we don't have any use for itsourceObservableCreate<T>(ObservableCreate<T>(ObservableCreate<T>source) generatedreturn source;
}
Copy the code
ObservableOnSubscribe
(){ObservableOnSubscribe
(){ObservableOnSubscribe
(){ObservableOnSubscribe
(){… New ObservableCreate
(source), Observable
(source), Observable
(source), Observable
(source), Observable
(source), Observable
(source), Observable
(source), Observable
(source), Observable
(source
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source; } // The method name gives away the truth"Real subscription, real subscription."@Override protected void subscribeActual(Observer<? Supert > observer) {// create CreateEmitter, CreateEmitter, CreateEmitter, CreateEmitter, CreateEmitter, CreateEmitter CreateEmitter<T> parent = new CreateEmitter<T>(observer); // 2. The Observer also subscribed to CreateEmitter object Observer.onsubscribe (parent); Try {// 3. The CreateEmitter object source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); }}... }Copy the code
First, ObservableCreate inherits an Observable, so ObservableCreate is an Observable, right, that’s it, ObservableCreate is the Observable we’re looking for. The three lines of code that you add to the subscribeActual() method are CreateEmitter and observer. One is the source, ObservableOnSubscribe, and you see the three lines of code, We dare to assume that the entire observer pattern’s message subscription and publishing is controlled by these three lines of code, and we need to read the source code further to verify this hypothesis. Since both ObservableOnSubscribe and observer are related to CreateEmitter, let’s look at CreateEmitter,
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return; } // Each time you must first judge whether the connection is cut (that is, there is no dispose), do not cut to receive the event // This judgment to ensure that once cut must receive the eventif(! isDisposed()) { observer.onNext(t); Override public void onError(Throwable t) {Override public void onError(Throwable t) {Override public void onError(Throwable t)if(! tryOnError(t)) { RxJavaPlugins.onError(t); } } @Override public boolean tryOnError(Throwable t) {if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."); } // If the connection is not severed, the Observer onError() method is calledif(! isDisposed()) { try { observer.onError(t); } finally {// If the connection is not severed, dispose() must be called to sever the connection after calling Observer onError(); }return true;
}
return false;
}
@Override
public void onComplete() {// // If the connection is not broken, call the Observer onComplete() methodif(! isDisposed()) { try { observer.onComplete(); } finally {// If the connection is not broken, dispose() must be called to break the connection after calling Observer onComplete(); }}}... }Copy the code
You can see that CreateEmitter inherits ObservableEmitter
and Disposable, so it’s both an ObservableEmitter and a Disposable, so when is it an ObservableEmitter, And when is Disposable, of course, in the observer. OnSubscribe (parent) it’s Disposable, and in the source.subscribe(parent) it’s ObservableEmitter. Why do you say that? We’ll see. Here I’ll talk about the source. The subscribe (parent), actually the code is ObservableOnSubscribe. Subscribe (ObservableEmitter), is to see clearly
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
...
}
}).subscribe(new Observer<Integer>() {
...
});
Copy the code
The above lines of code explain the sentence “in source.subscribe(parent) it is ObservableEmitter”. Now look at the subscribe() method that connects Observable and Observer,
@SchedulerSupport(SchedulerSupport.NONE) @Override public final void subscribe(Observer<? Super T > observer) {/ / sentenced to empty, without looking at ObjectHelper. RequireNonNull (the observer,"observer is null"); Try {/ / 1. This actually no dice, we use is the most simple usage, so what the parameters, the return value will return to the observer = RxJavaPlugins. It's the same onSubscribe (this, the observer); / / to empty, without looking at ObjectHelper. RequireNonNull (the observer,"Plugin returned null Observer"); ObservableCreate<T>(ObservableCreate<T>(ObservableCreate<T>(ObservableCreate<T>(ObservableCreate<T>(ObservableCreate<T>(ObservableCreate<T>(ObservableCreate<T>(ObservableCreate<T>(ObservableCreate<T>))sourceNew ObservableCreate<T>(source) 的代码
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
...
}
}
Copy the code
The subscribeActual(observer) method in the subscribe() method is overwritten in the new ObservableCreate
(source). You’ll notice that the observer in the subscribeActual method is the observer in the Subscribe () method.
// The method name reveals the truth"Real subscription, real subscription."@Override protected void subscribeActual(Observer<? Supert > observer) {// create CreateEmitter, CreateEmitter, CreateEmitter, CreateEmitter, CreateEmitter, CreateEmitter CreateEmitter<T> parent = new CreateEmitter<T>(observer); // 2. The Observer also subscribed to CreateEmitter object Observer.onsubscribe (parent); Try {// 3. The CreateEmitter object source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); }}Copy the code
Since the observer of the subscribe() method executes observer.onsubscribe (parent) in the subscribeActual method, let’s look at the observer of the subscribe() method.
. subscribe(new Observer<Integer>() { @Override public void onSubscribe(@NonNull Disposable d) { Log.d(TAG,"onSubscribe: isDisposable " + d.isDisposed());
}
@Override
public void onNext(@NonNull Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(@NonNull Throwable e) {
Log.d(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: "); }}...Copy the code
Public void onSubscribe(@nonnull Disposable D) public void onSubscribe(@nonnull Disposable D) Now that “in source.subscribe(parent) it is ObservableEmitter” and “in observer.onSubscribe(parent) it is Disposable” are explained, All the links between Observables and observers can be found in the above two sentences. Event publishing and receiving of observables and observers are the functions of these two lines of code. This also explains the question, why do we find it when we print logs
// The onSubscribe(@nonNULL Disposable d) method in the Observer executes 12-15 19:05:39.665 18795-18795/com.persist D/MainActivity: onSubscribe: isDisposablefalseObservable; // Subscribe (@nonnull ObservableEmitter<Integer> Emitter) Observable 18795-18795/com.persist.rxjava D/MainActivity: Observable Emit 1 12-15 19:05:39.665 18795-18795/com.persist. Rxjava D/MainActivity: onNext: 1 12-15 19:05:39.665 18795-18795/com.persist. Rxjava D/MainActivity: Observable Emit 2 12-15 19:05:39.666 18795-18795/com.persist. Rxjava D/MainActivity: onNext: 2 12-15 19:05:39.666 18795-18795/com.persist. Rxjava D/MainActivity: onComplete:Copy the code
Because they are ordered in the subscribeActual() method, comments 2 and 3 are the order in which they are ordered, as you can see in the code below. And that means ObservableEmitter doesn’t start sending events until an Observable connects to an Observer.
// The method name reveals the truth"Real subscription, real subscription."@Override protected void subscribeActual(Observer<? Supert > observer) {// create CreateEmitter, CreateEmitter, CreateEmitter, CreateEmitter, CreateEmitter, CreateEmitter CreateEmitter<T> parent = new CreateEmitter<T>(observer); // 2. The Observer also subscribed to CreateEmitter object Observer.onsubscribe (parent); Try {// 3. The CreateEmitter object source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); }}Copy the code
conclusion
- 1.
Observable
The object is actuallyObservableCreate
Object; - 2.
ObservableCreate
In the objectsubscribeActual
Method executesObservable
andObserver
Establishment of a connection; - 3.
subscribeActual
In the methodCreateEmitter
bothObservableEmitter
(Launcher) AgainDisposable
(severer), only if the connection is not severedObserver
theonNext()
Method is executed, and every timeObserver
To perform theonError()
andonComplete()
Methods afterCreateEmitter
In the objectonError()
andonComplete()
Method must be executed automaticallydispose()
Cut off the connection; - In 4.
Observable
andObserver
Once the connection is established,Observable
To be byObservableEmitter
As a matter of factCreateEmitter
) launch event.