RxJava2 use and source code reading
What is RxJava? According to RxJava’s description on GitHub: RxJava — Reactive Extensions for the JVM — a library for composing asynchronous and event-based programs using observable sequences for the Java
RxJava – a Java library that implements asynchronous operations based on observer mode and can be run on the JVM.
What RxJava does: It is the use of asynchronous RxJava that makes “logically complex code” extremely readable.
Rxjava making address
RxAndorid works with RxJava in Android. RxAndorid encapsulates AndroidSchedulers. MainThread (), Android developers use process, can easily post your tasks’ Android in the main thread, perform page updates.
RxAndroid making address
use
1, the observables
- Observable: The observed
- Observer: An Observer that receives data sent by an Observable
A, Rxjava thread switch:
//
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
//1. The asynchronous thread performs time-consuming operations
//2, "execute completed" calls onNext to trigger a callback to notify the observer
e.onNext("1");
e.onComplete();
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
// The subscribing moment is executed in the subscribing thread
}
@Override
public void onNext(String value) {
// Main thread execution method
}
@Override
public void onError(Throwable e) {
// Main thread execution method
}
@Override
public void onComplete(a) {
// Main thread execution method
}
});
Copy the code
B. Rxjava uses operators
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
/ / IO thread
// Request network data
e.onNext("123456");
}
}).map(new Function<String, Integer>() {
@Override
public Integer apply(String s) {
/ / IO thread
// Network data parsing
//
// throw new RequestFailException(" failed to get network request ");
return 123;
}
}).doOnNext(new Consumer<Integer>() { // Save the login result UserInfo
@Override
public void accept(@NonNull Integer bean) throws Exception {
/ / IO thread
// Save network data
}
}).subscribeOn(Schedulers.io()) / / IO thread
.observeOn(AndroidSchedulers.mainThread()) / / main thread
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer bean) throws Exception {
/ / update the UI
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
// Error display error page
}
});
Copy the code
2, Flowable
Flowable is produced in response to Backpressure. Flowable is an observed, used in conjunction with Subscriber
//
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
//1. The asynchronous thread performs time-consuming operations
//2, "execute completed" calls onNext to trigger a callback to notify the observer
emitter.onNext(0);
emitter.onComplete();
}
/ / if insufficient consumer spending power, then throw MissingBackpressureException anomalies
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
// Execute at subscribe time, occurs in "subscribe thread"
// This method is used to request the number of events that the producer can consume
// This indicates that the consumer has the spending power of long.max_value
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
// Main thread execution method
}
@Override
public void onError(Throwable t) {
// Main thread execution method
}
@Override
public void onComplete(a) {
// Main thread execution method
}
});
Copy the code
A. Backpressure
Backpressure is the problem caused by producers producing faster than consumers can consume.
There is a situation in RxJava where the observer sends messages so quickly that the observer cannot respond to them in time.
Such as:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
// The producer in the asynchronous thread has unlimited production capacity
while (true) {
e.onNext(1);
}
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
// In the "main thread", consumers' consumption capacity is insufficient, resulting in infinite accumulation of events, resulting in OOM
Thread.sleep(2000);
System.out.println(integer);
}
});
Copy the code
Producers in asynchronous threads have unlimited production capacity; The consumption capacity of consumers in the main thread is insufficient, resulting in infinite accumulation of events and finally OOM.
There is a name for this phenomenon: Backpressure
B, the Subscription request (long n);
The Subscription. Request (long n) method is used to request the number of events a producer can consume.
- When calling the
request(long n)
Method, the producer sends the corresponding number of events for consumer consumption; - if
Call request is not displayed
It meansThe consumption power is 0
.
In asynchronous invocation, RxJava has a cache pool to cache data that consumers cannot process temporarily cached. The default size of the cache pool is 128, meaning that only 128 events can be cached. Whether the number passed in request() is larger or smaller than 128, 128 events are initially stored in the cache pool; Of course, 128 events will not be stored if there are not so many events to send.
BackpressureStrategy.ERROR
Under the policy, if the producer produces more than 128 events, the cache pool overflows and throwsMissingBackpressureException
The exception;BackpressureStrategy.BUFFER
Strategy: Replace the default cache pool of 128 events in RxJava with a larger cache pool so that the producer will produce events even if a large number is passed by the consumer through request(). However, this method consumes more memory, and OOM will not be generated unless we know more about consumers’ consumption ability and can grasp the specific situation. BUFFER should be used with caution.BackpressureStrategy.DROP
Strategy: Discard the event when the consumer can’t handle it. The consumer passes in its demand N via Request (), and the producer passes n events to the consumer for consumption. Anything else that can’t be consumed is thrown away.BackpressureStrategy.LATEST
Policy: LATEST is consistent with the DROP function. The consumer passes in its demand N via Request (), and the producer passes n events to the consumer for consumption. Anything else that can’t be consumed is thrown away. The only difference is that LATEST always enables the consumer to receive the last event produced by the producer.
Source code reading – Simple example (1)
Note: the current source code version is RxJava :2.1.9
Start with a simple example that doesn’t involve operator and thread switching:
// Create an observer
Observer observer = new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String o) {
}
@Override
public void onError(@NonNull Throwable e) {
Log.d(TAG, "onError data is :" + e.toString());
}
@Override
public void onComplete(a) {
Log.d(TAG, "onComplete");
}
};
// Create the observed
Observable observable = Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(@NonNull ObservableEmitter e) throws Exception {
e.onNext("hello");
e.onNext("world");
e.onComplete();
}
});
/ / subscribe
observable.subscribe(observer);
Copy the code
A, ObservableOnSubscribe. Java
Look at the first ObservableOnSubscribe. Java class – this class
public interface ObservableOnSubscribe<T> {
void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}
Copy the code
ObservableOnSubscribe is an ObservableEmitter class. ObservableOnSubscribe is an ObservableEmitter class. ObservableEmitter is an ObservableEmitter class.
ObservableEmitter.java
ObservableEmitter ObservableEmitter ObservableEmitter
public interface ObservableEmitter<T> extends Emitter<T> {
void setDisposable(@Nullable Disposable d);
void setCancellable(@Nullable Cancellable c);
boolean isDisposed(a);
@NonNull
ObservableEmitter<T> serialize(a);
@Experimental
boolean tryOnError(@NonNull Throwable t);
}
Copy the code
ObservableEmitter is an extension of Emitter, and the extension method was introduced after RxJava2.0. With new capabilities such as being able to cancel in the middle of the cell, Emitter
public interface Emitter<T> {
void onNext(@NonNull T value);
void onError(@NonNull Throwable error);
void onComplete(a);
}
Copy the code
And these are three methods that you’re all familiar with. This corresponds to the following code:
new ObservableOnSubscribe() {
@Override
public void subscribe(@NonNull ObservableEmitter e) throws Exception {
e.onNext("hello");
e.onNext("world");
e.onComplete();
}
}
Copy the code
ObservableOnSubscribe
source (ObservableOnSubscribe
source)
B, observables. Create (ObservableOnSubscribe source)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
Copy the code
- RxJavaPlugins ignore
- We see incoming
ObservableOnSubscribe
Is used to createObservableCreate
, in fact,ObservableCreate
isObservable
An implementation class of
ObservableOnSubscribe
source, ObservableOnSubscribe
source, ObservableOnSubscribe
source
//
ObservableCreate observableCreate = new ObservableCreate<T>(new ObservableOnSubscribe() {
@Override
public void subscribe(@NonNull ObservableEmitter e) throws Exception {
// execute in the IO thread
e.onNext("hello");
e.onNext("world");
e.onComplete();
}
});
Copy the code
- Here we know: when
ObservableOnSubscribe.subscribe
Method is executed by the user through the callObservableEmitter.onNext
Method to send data out (to the observer)
Now let’s look at the ObservableCreate class
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
// omit some code...
}
Copy the code
ObservableOnSubscribe.subscribe
Method is inObservableCreate.subscribeActual
Is executed in line 4 of the method;subscribe
Method, the user calls theObservableEmitter.onNext
Method to send the data out;- while
subscribeActual
The second line of the method is calledobserver.onSubscribe(parent);
Methods. When a subscription occurs, it is actively executed in the subscribing threadobserver
theonSubscribe
Methods; CreateEmitter
Is theObservableCreate.subscribeActual(Observer observer)
Method passed inObserver
Encapsulation;CreateEmitter
When a task is cancelled, it can no longer call back its encapsulated observer.observer
theonNext
Methods byCreateEmitter.onNext
Method call;
Observable.create(ObservableOnSubscribe
source); The ObservableCreate method finally returns an ObservableCreate object. Below the observables. Subscribe (observer); methods
C, observables. Subscribe (observer);
observable.subscribe(observer);
The moment the subscription takes place.- here
observable.subscribe(observer);
The actual isObservableCreate.subscribe(observer);
Look at the Subscribe (Observer) method of an Observable
Observable.subscribe(Observer observer)
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
// Subscribe to Observable, which executes the subscribeActual method
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
//
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
Copy the code
- call
observable.subscribe(observer);
Method is actually calledobservable.subscribeActual(observer)
Methods. observable
为ObservableCreate
A reference to, so the call here isObservableCreate.subscribeActual(observer)
Methods.
Let’s go back to the subscribeActual method of the ObservableCreate class
ObservableCreate.java
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
// subscribeActual is called the moment the subscription occurs (observable.subscribe(observer); Called when
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter terminates callback to Observer methods onNext, onError, etc
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
// When a subscription occurs, the observer's onSubscribe(Disposable d) method is executed
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
// omit some code...
}
Copy the code
subscribeActual
Methods in theThe subscription is invoked the moment it occurs; inobservable.subscribe(observer);
Is called when;observer.onSubscribe(parent);
When a subscription occurs, the subscriber thread is called backobserver
theonSubscribe
methodssubscribeActual
Method, passed inObserver
Will be packaged into aCreateEmitter
; If the task is cancelled, passCreateEmitter
Can be terminatedobserver
In the wayOnNext, onError
Etc callback;
The second line of code observer.onsubscribe (parent);
observer.onSubscribe(parent); When a subscription occurs, the observer’s onSubscribe(Disposable D) method is executed, which returns to the following code
// Create an observer
Observer observer = new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.d(TAG, "onSubscribe");
}
/ /... Omit onNext, onError, onComplete
};
Copy the code
- The argument passed here is
new CreateEmitter(observer)
And it came trueDisposable
Interface that does not call back the incoming observer if the task is canceledobserver
The correspondingOnNext, onError, onComplete
Methods such as
Source. Subscribe (parent);
source.subscribe(parent); Is ObservableOnSubscribe. Subscribe (new CreateEmitter < T > (the observer));
The code finally returns to the subscribe of ObservableOnSubscribe:
new ObservableOnSubscribe() {
@Override
public void subscribe(@NonNull ObservableEmitter e) throws Exception {
e.onNext("hello");
e.onNext("world");
e.onComplete();
}
}
Copy the code
- in
subscribe
Is called toCreateEmitter
Of the classOnNext, onComplete, onError
Method to send dataCreateEmitter
In theThe observer
At this point, the code trail for “this simple example that doesn’t involve operators and thread switching” ends.
Thread Switching (2)
Note: the current source code version is RxJava :2.1.9
Start with this simple example of a thread switch:
// Create an observer
Observer observer = new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
// The subscribing moment is executed in the subscribing thread
}
@Override
public void onNext(String o) {
// Execute in the main thread of Android
}
@Override
public void onError(@NonNull Throwable e) {
// Execute in the main thread of Android
}
@Override
public void onComplete(a) {
// Execute in the main thread of Android
}
};
// Create the observed
Observable observable = Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(@NonNull ObservableEmitter e) throws Exception {
// execute in the IO thread
e.onNext("hello");
e.onNext("world");
e.onComplete();
}
});
// The observed IO thread
observable = observable.subscribeOn(Schedulers.io());
// Observe the main thread of Android
observable = observable.observeOn(AndroidSchedulers.mainThread());
/ / subscribe
observable.subscribe(observer);
Copy the code
Here’s my summary of the entire code execution flow for RxJava2:
A, observables. Create (ObservableOnSubscribe source)
ObservableOnSubscribe
source ObservableOnSubscribe
source ObservableOnSubscribe
source ObservableOnSubscribe
source
//
ObservableCreate observableCreate = new ObservableCreate<T>(new ObservableOnSubscribe() {
@Override
public void subscribe(@NonNull ObservableEmitter e) throws Exception {
// execute in the IO thread
e.onNext("hello");
e.onNext("world");
e.onComplete();
}
});
Copy the code
ObservableCreate
Contains asubscribeActual(observer)
Method to execute the incoming observerobserver.onSubscribe
Method, and indirectly calling the observerOnNext, onComplete
Methods;
ObservableCreate
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
// omit some code...
}
Copy the code
subscribeActual
The second line of the method calls the observer’sobserver.onSubscribe(parent);
Methods; When a subscription occurs, it is actively executed in the subscribing threadobserver
theonSubscribe
Methods;subscribeActual
The fourth line of the method calls the observer’sobserver.subscribe
Methods;subscribe
Method, the user calls theCreateEmitter.onNext
Method to send the data out;CreateEmitter
Is theObservableCreate.subscribeActual(Observer observer)
Method passed inObserver
Encapsulation;CreateEmitter
When a task is cancelled, it can no longer call back its encapsulated observer.observer
theonNext
Methods byCreateEmitter.onNext
Method call;
Check the code associated with Observable. SubscribeOn (schedulers.io ()) below
ObservableEmitter is a reference to CreateEmitter, a further encapsulation of the ObservableEmitter Observer. CreateEmitter does not call back the Observer onNext method if the task is cancelled while executing onNext.
B, observables. SubscribeOn (Schedulers. IO ())
Look at the subscribeOn(Scheduler) method of the Observable class
Observable.java
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
// Generate an ObservableSubscribeOn object
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
Copy the code
- Continue to ignore
RxJavaPlugins
- And finally return a
ObservableSubscribeOn
object
Here observables observables. = observableCreate subscribeOn Schedulers. IO () () code is actually
ObservableSubscribeOn observable = new ObservableSubscribeOn<T>(observableCreate, Schedulers.io())
Copy the code
- so
observable.subscribeOn(Schedulers.io())
It returns oneObservableSubscribeOn
A reference to the
Check out the ObservableSubscribeOn below
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T.T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
/ /... Omit some code
}
Copy the code
Take a look at the subscribeActual method in Observables Subscribeon
subscribeActual
The second line of the method executes the passObserver
的onSubscribe
Methods;subscribeActual
Method line 3: inscheduler
The correspondingIO thread
,observableCreate
thesubscribe
Method, passed in as the argumentSubscribeOnObserver
, that is:IO thread
performobservableCreate.subscribe(new SubscribeOnObserver(observer));
As a result, Whatever ObservableSubscribeOn. SubscribeActual (observer) in which thread is called observableCreate. The subscribe (new SubscribeOnObserver
(observer)) are executed in the IO thread, so the observer’s e.onnext (“hello”); e.onComplete(); Also executes in the IO thread;
C, observables. ObserveOn (AndroidSchedulers mainThread ())
Scheduler Scheduler: Scheduler Scheduler: Scheduler: Scheduler: Scheduler: Scheduler: Scheduler: Scheduler: Scheduler: Scheduler: Scheduler: Scheduler: Scheduler
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
//
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
Copy the code
Here you can see the observables observables = observableSubscribeOn. ObserveOn (AndroidSchedulers. MainThread ()) is actually:
ObservableObserveOn observable = new ObservableObserveOn<T>(observableSubscribeOn, AndroidSchedulers.mainThread(), false.128);
Copy the code
Therefore, observables. ObserveOn (AndroidSchedulers mainThread ()) returns the ObservableObserveOn references.
Look at ObservableObserveOn down here
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T.T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
/ /... Omit some code
}
Copy the code
Take a look at the subscribeActual method in ObservableObserveOn
subscribeActual
The fifth line of the method is actuallyobservableSubscribeOn.subscribe(new ObserveOnObserver(observer, w, delayError, bufferSize));
ObserveOnObserver
The function of theObserveOnObserver
theonNext
When a method is implemented; willobserver
theonNext
Methods the post toAndroid is the main thread
;
D, observables. Subscribe (observer)
- We know that
Observable
的subscribe(Observer observer)
Method is actually calledObservable
thesubscribeActual(Observer observer)
Methods; - And here,
observable
The actual isObservableObserveOn
The reference;
Therefore, observables. Subscribe (observer) is actually executed observableObserveOn. SubscribeActual (observer)
Here, our small example of thread switching (2) transforms into the following code:
// Create an observer
Observer observer = new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
// The subscribing moment is executed in the subscribing thread
}
@Override
public void onNext(String o) {
// Execute in the main thread of Android
}
@Override
public void onError(@NonNull Throwable e) {
// Execute in the main thread of Android
}
@Override
public void onComplete(a) {
// Execute in the main thread of Android
}
};
//
ObservableCreate observableCreate = new ObservableCreate<T>(new ObservableOnSubscribe() {
@Override
public void subscribe(@NonNull ObservableEmitter e) throws Exception {
// execute in the IO thread
e.onNext("hello");
e.onNext("world");
e.onComplete();
}
});
//
ObservableSubscribeOn observableSubscribeOn = new ObservableSubscribeOn<T>(observableCreate, Schedulers.io())
//
ObservableObserveOn observableObserveOn = new ObservableObserveOn<T>(observableSubscribeOn, AndroidSchedulers.mainThread(), false.128);
//
observableObserveOn.subscribeActual(observer);
Copy the code
Here we see observableObserveOn. SubscribeActual (observer)
ObservableObserveOn.java
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T.T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
/ / source for observableSubscribeOn
super(source);
/ / scheduler for AndroidSchedulers. MainThread ()
this.scheduler = scheduler;
// false
this.delayError = delayError;
/ / 128
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
/ / AndroidSchedulers mainThread HandlerScheduler (), so will go into the else part of the code
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
}
// The code goes to the else section
else {
Scheduler.Worker w = scheduler.createWorker();
/ / source for observableSubscribeOn
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
/ /... Omit some code
}
Copy the code
subscribeActual
Method,AndroidSchedulers.mainThread()
forHandlerScheduler
, soifIs ignored and goes directly to the code’selsePart.subscribeActual
Method, the observerobserver
Encapsulation isObserveOnObserver
; And callobservableSubscribeOn.subscribe(new ObserveOnObserver(observer, w, delayError, bufferSize))
- while
observableSubscribeOn.subscribe(new ObserveOnObserver(observer, w, delayError, bufferSize))
The actual is
ObserveOnObserver observeOnObserver = new ObserveOnObserver<T>(observer, w, delayError, bufferSize)
// 1, "subscribe thread" -- execute onSubscribe, actually execute observer onSubscribe method
observeOnObserver.onSubscribe(new SubscribeOnObserver<T>(observeOnObserver));
// 2, "I/O" -- execute subscribe; In the IO thread subscribe method, the user actively calls the onNext, onError, onComplete methods of ObserveOnObserver to send data
observableCreate.subscribe(new SubscribeOnObserver<T>(observeOnObserver))
Copy the code
- The user calls
SubscribeOnObserver
theonNext
It’s sending the data out SubscribeOnObserver.onNext
Call theobserveOnObserver.onNext
observeOnObserver.onNext
throughHandlerScheduler
willObserver. onNext, observer.onError, observer.onComplete
And other methods to execute in the Android main thread.
E. The overall flow chart is as follows
Finally, summarize the entire execution process of RxJava2:
reference
RxJava 2.0 (a) RxJava2 source code parsing (a) RxJava2 source code parsing — process