The introduction
‘Read The Fucking Source Code’ is a well-known meme in programmer circles. Everyone knows that reading source code is boring, but it is painful to have to do it. My goal with this series is to make reading the source code enjoyable. In the first chapter, I decided to start with rxjava2 source code reading. Because the framework is used daily and is often asked in interviews, it has become an Essential skill for Android. But it’s not enough to know how to use it. It’s not enough to be confused when an interviewer asks you how it works. So there is the first volume of RTFSC, RXJavA2 source code reading. I will try to read the source code this boring thing, to everyone said a bit more interesting, popular. Rxjava2 source code analysis (a) basic process analysis RXJAVA2 source code analysis (two) thread switching analysis RXJAVA2 source code analysis (three) thread pool principle analysis
Start with the basics
First, write a basic usage of rxJava2. Let’s use this simple example to see what the entire rxjava2 process looks like.
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("1");
emitter.onNext("2");
emitter.onNext("3");
emitter.onNext("4");
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG,"onSubscribe");
}
@Override
public void onNext(String s) {
Log.d(TAG,"s = "+ s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {}});Copy the code
This top part, it looks too long, but we can simplify it.
Observable.create(ObservableOnSubscribe).subscribe(Observer);
Copy the code
Without further ado, let’s get to the point:
- 1. You can see that there are three classes with very similar names:
Observable
.ObservableOnSubscribe
.Observer
. That’s what we call the observed, the observer. - 2. Let’s visualize it a little better.
Observable
We call them decorators,ObservableOnSubscribe
We also call it the emission source,Observer
We call it the processor. Why so called, we can look at the source side of the side. - 3. We can visualize the above content as: decorator
Observable
Through acreate
Methods and asubscribe
Method to connect the transmitter to the processor.
Let’s look at how this connection is implemented in the source code.
A decoratorObservable
Start with Observable Create.
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source."source is null"); // Void effectreturn RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
Copy the code
To highlight:
- 1.
create
Method, you need to pass in an emission sourceObservableOnSubscribe<T>
Object that returns oneObservable<T>
Object. - 2. Ignore null code,
onAssembly
We’ll leave the method aside for now, just to know that it returns the passed parameter. thecreate
The method is to return oneObservableCreate
Object.
So let’s look at the ObservableCreate class.
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source; }... }Copy the code
To highlight:
- 1.
ObservableCreate
This class inherits fromObservable
. - 2.
ObservableCreate
In the construction method, the emission source in the parameters is directlyObservableOnSubscribe
Stored locally as source.
OK, I’m done with the create method. Simple, one-sentence summary, creates a decorator object that stores the emitter locally for backup. (Do you feel like watching Wang Gang cook?)
Why do we call an Observable an decorator? Because RXJava uses the decorator pattern here, Observable is the base class in the decorator pattern. The decorator mode is not obvious here, but will be at the back.
Source ObservableOnSubscribe
The ObservableOnSubscribe parameter is passed to the create method.
public interface ObservableOnSubscribe<T> {
/**
* Called for each Observer that subscribes.
* @param emitter the safe emitter instance, never null
* @throws Exception on error
*/
void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}
Copy the code
To highlight:
- 1. Source
ObservableOnSubscribe
Is an interface that we’ll override when we use itsubscribe
Methods. - 2. We’ll be there
subscribe
Method defines the sequence of events to be performed next, so we callObservableOnSubscribe
Is the event emission source. - 3.
subscribe
The method has an argument that is the emitterObservableEmitter
(More on that later).
Subscribe (Connect)
Now, the next step: Subscribe. Observable create returns an ObservableCreate object. The ObservableCreate subscribe method is not rewritten, so let’s just look at the Subscribe method in an Observable.
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null"); Try {/ / to empty and hock mechanism, ignored the observer. = RxJavaPlugins onSubscribe (this, the observer). ObjectHelper.requireNonNull(observer,"The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins"); // focus on the subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; }}Copy the code
Let’s cut through the unimportant code and get to the point. After simplifying the key code, it can become:
public final void subscribe(Observer<? super T> observer) {
observer = RxJavaPlugins.onSubscribe(this, observer);
subscribeActual(observer);
}
Copy the code
RxJavaPlugins are also left aside for the moment, as is the case with the onAssembly above, and all we need to know is that this is a return to the passed observer. Then only the subscribeActual(Observer) key code is left. SubscribeActual in Observable is an abstract method that is implemented in subclasses.
In fact, we can see here that this is a decorator pattern.Observable
Is the base class for the decorator pattern, and its subclasses do virtually all the work. So we call them decorators. Don’t justcreate
Method, some other operators, for examplemap
.flatMap
Same thing. You’ll get a better sense of this later when we talk about operators and thread switching.
So when we analyze the Subscribe method of an Observable, we simply look at the subscribeActual(Observer) in the subclass.
@Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); }}Copy the code
To highlight:
- 1. Create one
CreateEmitter
objectparent
And then invokes the processorobserver
theonSubscribe
Method holds it. - 2. Call again
source.subscribe(parent)
Pass it in tosource
In the middle. thissource
It’s the backup source that we talked about earlierObservableOnSubscribe
, in which thesubscribe
The method just needs a transmitterCreateEmitter
.
The entire subscription line is clear:
- 1.
Observable
callcreate
Method, parameter is an emission sourceObservableOnSubscribe
(We are concerned about itsubscribe
Method to override) to generate aObservableCreate
Object. - 2.
ObservableCreate
callsubscribe
Method, and the argument is a processorObserver
. - In 3.
subscribe
In the method we takeObserver
An emitter is generated for the parameterCreateEmitter
, and calls the emitter with the emitter as an argumentObservableOnSubscribe
thesubscribe
Methods.
What is this CreateEmitter? Let’s take a look at its source code.
The emitterCreateEmitter
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if(! isDisposed()) { observer.onNext(t); } } @Override public void onError(Throwable t) {if(! tryOnError(t)) { RxJavaPlugins.onError(t); } } @Override public boolean tryOnError(Throwable t) {if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if(! isDisposed()) { try { observer.onError(t); } finally { dispose(); }return true;
}
return false;
}
@Override
public void onComplete() {
if(! isDisposed()) { try { observer.onComplete(); } finally { dispose(); } } } @Override public voiddispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
returnDisposableHelper.isDisposed(get()); }... }Copy the code
To highlight:
- 1.
CreateEmitter
isObservableCreate
Class, inherited fromAtomicReference<Disposable>
.ObservableEmitter<T>, Disposable
We call it a transmitter. - 2. We learn from
onNext
As you can see from the method, the emitter is directly connected to an external processor. - 3. The emitter inherits from
Disposable
Disposed() interface, which has only two methods, Dispose () and isDisposed(), to cut off the firing process. - 4. On top
subscribeActual
And what we see in the method is,Observer
Have a callonSubscribe
Method holds thisCreateEmitter
Emitter object. So we can pass through the processordispose
() The interface interrupts the transmitting process at any time. - 5. Also, we can see in the code,
onError
andonComplete
The two are mutually exclusive. Only one will be executed, because if one of them is executed, it will immediately cut off the launch process.
conclusion
To summarize some of the classes that have emerged:
Observable
-> Base class for the decorator pattern, which we call decorators. There is acreate
Method, parameter is oneObservableOnSubscribe
The source will return oneObservableCreate
Object.ObservableCreate
-> Decorator implementation class. There is asubscribe
Method, the argument isObserver
The processor. insubscribe
Method inside, we takeObserver
One is generated for the parameterCreateEmitter
Emitter, and with this emitter as an argument, calls the emitter’ssubscribe
Methods.ObservableOnSubscribe
-> The transmitter itself is just an interface, we rewrote itsubscribe
Method, which defines the event to be processed next, is called the emitter.- CreateEmitter -> emitter, the constructor contains a processor. The processor holds this emitter object and can interrupt the firing process at any time. In the emitter
onError
andonComplete
Both are mutually exclusive and only one will be executed. Observer
-> Processor. Used to process data sent by the transmitter.
To summarize the whole operation process is as follows:
- 1.
Observable
callcreate
Method, parameter is an emission sourceObservableOnSubscribe
(We are concerned about itsubscribe
Method to override) to generate aObservableCreate
Object. - 2.
ObservableCreate
callsubscribe
Method, and the argument is a processorObserver
. - In 3.
subscribe
In the method we takeObserver
One is generated for the parameterCreateEmitter
Emitter, and calls the emitter with this emitter as an argumentObservableOnSubscribe
thesubscribe
Methods. - 4. The source
ObservableOnSubscribe
thesubscribe
Method defines the event we want to process and passes the result to the emitterCreateEmitter
.CreateEmitter
First determine whether the event stream is disconnected, continuously open to pass the result to the processorObserver
. - 5. The processor
Observer
Processing results.
expand
This time we’ll look back at what we throw away in front of the, RxJavaPlugins. OnAssembly and RxJavaPlugins onSubscribe. Let’s go straight to the source code.
/**
* Calls the associated hook function.
* @param <T> the value type
* @param source the hook's input value * @return the value returned by the hook */ @SuppressWarnings({ "rawtypes", "unchecked" }) @NonNull public static
Observable
onAssembly(@NonNull Observable
source) { Function
f = onObservableAssembly; if (f ! = null) { return apply(f, source); } return source; }
Copy the code
Method description: Calls the associated hook function. This is equivalent to using Java reflection mechanism to wrap source interception. Rxjava provides us with a hook injection method that we can use to call the interceptor function we set up before calling source. All we need to know now is that we have this thing, and we need to use this thing later.
The last
This article is mainly about the basic use of RXJava source process, the next article I said that the thread switch.
Updated weekly, stay tuned ~