This article is sponsored by Yugangshuo writing platform
Original author: April Grapes
Copyright: this article belongs to the wechat public account Yu Gang said all, without permission, shall not be reproduced in any form
1. Introduction
This article is mainly on the RxJava message subscription and thread switching source analysis, related to the use of the way is not introduced in detail.
This source code is based on RxJava :2.1.14.
2. RxJava profile
RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.
It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.
The above is from RxJava’s github introduction. Translated into Chinese, it roughly means:
RxJava is a responsive extension to the Java virtual machine, a library that combines asynchronous and event-based programs using observable sequences.
It extends the observer pattern to support data/event sequences and adds operators that allow you to declaratively combine sequences while abstracting concerns such as low-level threading, synchronization, thread safety and concurrent data structures.
Simply put, RxJava is an asynchronous library that uses the observer pattern.
3. Observer mode
As mentioned above, RxJava extends the observer mode, so what is the observation mode? Let’s look at it first.
For example, take wechat public account as an example, a wechat public account will constantly produce new content, if our readers are interested in the contents of the wechat public account, they will subscribe to the public account, when the public account has new content, it will be pushed to us. When we receive new content, if it’s something we’re interested in, we click on it. If it’s an advertisement, you might just ignore it. This is the typical observer pattern that we encounter in life.
In the above example, the wechat official account is an Observable that constantly generates content (event), and we readers are an Observer. By subscribing, we can receive the content (event) pushed by the wechat official account (Observer). Make different actions according to different content (events).
3.1 Rxjava Role Description
There are four roles in RxJava’s extended Observer pattern:
role | Role functions |
---|---|
Observed (Observable ) |
Generate events |
Observer (Observer ) |
Respond to events and process them |
Events (Event ) |
The observed and the message carrier of the observer |
Subscribe (Subscribe ) |
Connect the observed to the observer |
3.2 RxJava event types
Events in RxJava fall into three types: Next events, Complete events, and Error events. The details are as follows:
The event type | meaning | instructions |
---|---|---|
Next |
Regular event | The observer can send an infinite number of Next events, and the observer can receive an infinite number of Next events |
Complete |
The end of the event | After the observer sends the Complete event, the observer can continue to send events. After receiving the Complete event, the observer will not accept any other events |
Error |
Abnormal events | After the Error event is sent by the observer, the sending of other events is stopped. After the Error event is received by the observer, no other events are accepted |
4.RxJava message subscription
Before looking at the mechanics of RxJava message subscription, let’s take a look at the simple steps to use it. Here for the convenience of explanation, do not use the chain code to illustrate, but a step-by-step way to explain one by one (usually write code or recommend to use chain code to call, because it is more concise). Its use steps are as follows:
- Create observed (
Observable
), which defines the event to send.- Create observer (
Observer
), accepts the event and responds.- By subscribing to (
subscribe
) the observer connects them together.
4.1 RxJava message subscription example
Here we follow the steps above to implement this example, as follows:
// Step 1. Create an Observable and define an event to send.
Observable observable = Observable.create(
new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter)
throws Exception {
emitter.onNext(Article 1 "");
emitter.onNext(Article 2 "");
emitter.onNext("Article 3"); emitter.onComplete(); }});// Step 2. Create an Observer that accepts events and responds to them.
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext : " + s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError : " + e.toString());
}
@Override
public void onComplete(a) {
Log.d(TAG, "onComplete"); }};// Step 3. The observer connects them by subscribing to the observed.
observable.subscribe(observer);
Copy the code
The output is as follows:
OnSubscribe onNext: Article1OnNext:2OnNext:3
onComplete
Copy the code
4.2 Source code Analysis
Let’s take a look at the source code for the message subscription process in two parts: the creation observed process and the subscription process.
4.2.1 Creating an Observed process
Let’s start by looking at the process of creating an Observable. In the example above, we created an Observable directly using Observable.create(). Let’s click on this method to see how it works.
4.2.1.1 Create () of Observable
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
Copy the code
As you can see, the create() method does nothing but create an ObservableCreate object and pass our own ObservableOnSubscribe parameter to the ObservableCreate. Finally, call the rxJavaplugins.onAssembly () method.
Let’s start with the ObservableCreate class:
4.2.1.2 ObservableCreate class
public final class ObservableCreate<T> extends Observable<T> {// Inherit from Observable
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;// Assign the ObservableOnSubscribe object we created to source.}}Copy the code
As you can see, ObservableCreate inherits from Observable and stores ObservableOnSubscribe objects.
Take a look at the rxJavaplugins.onAssembly () method
4.2.1.3 onAssembly() of RxJavaPlugins
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
// Omit irrelevant code
return source;
}
Copy the code
Simply return the ObservableCreate created above.
4.2.1.4 Simple summary
So observable.create () repackages our custom ObservableOnSubscribe object into an ObservableCreate object and returns the ObservableCreate object. Note that this use of rewrapping new objects is frequently used in RxJava and will be encountered many times in the analysis that follows. Put a picture to understand, wrap it up
4.2.1.5 sequence diagram
The sequence diagram for Observable.create() looks like this:
4.2.2 Subscription Process
Now let’s take a look at the code for the subscribe process. Again, go to Observable.subscribe() :
4.2.2.1 Subscribe () of Observable
public final void subscribe(Observer<? super T> observer) {
// Omit irrelevant code
observer = RxJavaPlugins.onSubscribe(this, observer);
subscribeActual(observer);
// Omit irrelevant code
}
Copy the code
As you can see, the core code is actually just two sentences, let’s take a look at them separately:
4.2.2.2 onSubscribe() of RxJavaPlugins
public static <T> Observer<? super T> onSubscribe(
@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
// Omit irrelevant code
return observer;
}
Copy the code
As in the previous code, the original observer is returned. Take a look at the subscribeActual() method.
4.2.2.3 subscribeActual() of the Observable class
protected abstract void subscribeActual(Observer<? super T> observer);
Copy the code
The method in the subscribeActual() of the Observable class is an abstract method, so where is the actual implementation? Remember when we created the observed, it eventually returns an ObservableCreate object, which is a subclass of Observable. Let’s click on it and see:
4.2.2.4 ObservableCreate Class subscribeActual()
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
// Triggers our custom Observer's onSubscribe(Disposable) method
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch(Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); }}Copy the code
As you can see, the subscribeActual() method first creates a CreateEmitter object and passes in our custom observer observer as an argument. Again, wrap it up and put a picture:
CreateEmitter
ObservableEmitter
Disposable
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
// Omit the code
}
Copy the code
This is followed by a call to observer.onsubscribe (parent), which is actually a call to the observer’s onSubscribe() method, which tells the observer that it has successfully subscribed to the observed.
Continuing on, the subscribeActual() method continues to call source.subscribe(parent), where source is the ObservableOnSubscribe object, That is, the subscribe() method of ObservableOnSubscribe is called here. We specifically define subscribe() as follows:
Observable observable = Observable.create(
new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter)
throws Exception {
emitter.onNext(Article 1 "");
emitter.onNext(Article 2 "");
emitter.onNext("Article 3"); emitter.onComplete(); }});Copy the code
An ObservableEmitter is an ObservableEmitter. So, the three onNext() methods and one onComplete() in subscribe() are called one by one. The ObservableEmitter interface is implemented as CreateEmitter. Let’s look at the onNext() and onComplete() implementations of the CreateEmitte class:
4.2.2.5 CreateEmitter class onNext(), onComplete(), etc
// Omit the other code
@Override
public void onNext(T t) {
// Omit irrelevant code
if(! isDisposed()) {// Call the observer onNext()observer.onNext(t); }}@Override
public void onComplete(a) {
if(! isDisposed()) {try {
// Call the observer's onComplete()
observer.onComplete();
} finally{ dispose(); }}}Copy the code
As you can see, the end result is the onNext() and onComplete() methods that are called to the observer. At this point, a complete message subscription process is complete. In addition, you can see that there is an isDisposed() method that can control the direction of the message, i.e. cut off the message delivery, which I’ll talk about later.
4.2.2.6 Summary
After the connection is established between the Observable and the Observer, a CreateEmitter is created. The emitter sends events generated by the Observed to the Observer, and the Observer responds to the events emitted by the Emitter. As you can see, it is only after the subscription that the Observable starts sending events.
Here is a diagram of the flow of events:
4.2.2.7 Timing Flow chart
Let’s take a look at the timing flow of the subscription process:
4.3 Cutting a Message
Cutting messages has been mentioned before, so let’s see how to use it:
4.3.1 Cutting the message
Observable observable = Observable.create(
new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter)
throws Exception {
emitter.onNext(Article 1 "");
emitter.onNext(Article 2 "");
emitter.onNext("Article 3"); emitter.onComplete(); }}); Observer<String> observer =new Observer<String>() {
private Disposable mDisposable;
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe : " + d);
mDisposable=d;
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext : " + s);
mDisposable.dispose();
Log.d(TAG, "Disconnect the observer from the observed.");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError : " + e.toString());
}
@Override
public void onComplete(a) {
Log.d(TAG, "onComplete"); }}; observable.subscribe(observer);Copy the code
The output is:
onSubscribe : nullOnNext:1Disconnect the observer from the observedCopy the code
As you can see, to shut off the message passing is as simple as calling the Dispose () method on Disposable. After dispose() is called, the observer can continue to send the message, but the observer can not receive the message. The Disposable value of the onSubscribe output is “null”, not null.
4.3.2 Cut off message source analysis
Let’s look at the implementation of Dispose () here. Disposable is an interface that understands that Disposable is a connector that will be broken after dispose() is called. This is implemented in the CreateEmitter class, which was mentioned earlier. Let’s look at the Dispose () method of CreateEmitter:
4.3.2.1 CreateEmitter dispose ()
@Override
public void dispose(a) {
DisposableHelper.dispose(this);
}
Copy the code
Just call DisposableHelper.Dispose (this).
4.3.2.2 DisposableHelper class
public enum DisposableHelper implements Disposable {
DISPOSED
;
// Omit other code
public static boolean isDisposed(Disposable d) {
// Determine if the reference to a variable of type Disposable is equal to DISPOSED
// That is, determine whether the connector is interrupted
return d == DISPOSED;
}
public static boolean dispose(AtomicReference<Disposable> field) {
Disposable current = field.get();
Disposable d = DISPOSED;
if(current ! = d) {// Here we will set field to DISPOSED
current = field.getAndSet(d);
if(current ! = d) {if(current ! =null) {
current.dispose();
}
return true; }}return false; }}Copy the code
You can see DisposableHelper is an enumerated class and has only one value, DISPOSED. An atomic reference field is DISPOSED in the Dispose () method, that is, marked as interrupted. Therefore, the isDisposed() method enables you to determine if your connector is interrupted.
4.3.2.3 Methods in CreateEmitter class
Back to the method in the CreateEmitter class:
@Override
public void onNext(T t) {
// Omit irrelevant code
if(! isDisposed()) {// onNext() is called only if it is not disposed ()observer.onNext(t); }}@Override
public void onError(Throwable t) {
if(! tryOnError(t)) {// If Dispose () is called here, it will eventually crashRxJavaPlugins.onError(t); }}@Override
public boolean tryOnError(Throwable t) {
// Omit irrelevant code
if(! isDisposed()) {try {
// onError() is called only if it is not disposed ()
observer.onError(t);
} finally {
// Dispose () after onError()
dispose();
}
// Return true if dispose() is not used
return true;
}
// If dispose() is disposed, return false
return false;
}
@Override
public void onComplete(a) {
if(! isDisposed()) {try {
// onComplete() is called only if dispose() is not used
observer.onComplete();
} finally {
// Dispose () after onComplete()dispose(); }}}Copy the code
As you can see from the above code:
- If there is no
dispose
.observer.onNext()
Will be called to.onError()
andonComplete()
Mutually exclusive. Only one of them can be called, because either one of them will be calleddispose()
.- First,
onError()
afteronComplete()
.onComplete()
Will not be called to. Conversely, it will crash becauseonError()
Throws an exception in:RxJavaPlugins.onError(t)
. Is, in fact,dispose
And then continue to callonError()
Will be Fried.
5.RxJava thread switching
The above example and analysis are performed in the same thread, and there is no thread switching involved. However, in practice, we usually need to do some data fetching in a child thread, and then update the UI in the main thread, which involves thread switching. With RxJava, we can also write thread switching neatly.
5.1 Thread Switching Example
I won’t go into detail here about how RxJava uses thread switching. Let’s look directly at an example and print out the threads in which RxJava is running for each role.
new Thread() {
@Override
public void run(a) {
Log.d(TAG, "Thread run() is running on :" + Thread.currentThread().getName());
Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d(TAG, "Observable Subscribe () on thread :" + Thread.currentThread().getName());
emitter.onNext(Article 1 "");
emitter.onNext(Article 2 "");
emitter.onComplete();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "Observer onSubscribe() thread :" + Thread.currentThread().getName());
}
@Override
public void onNext(String s) {
Log.d(TAG, "Observer onNext() on thread :" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "Observer onError() thread :" + Thread.currentThread().getName());
}
@Override
public void onComplete(a) {
Log.d(TAG, "Observer onComplete() on thread :"+ Thread.currentThread().getName()); }}); } }.start();Copy the code
The output is:
Thread run(a)The Thread is thread-2 ObserveronSubscribe(a)The Thread is thread-2 Observablesubscribe(a)The thread is RxCachedThreadScheduler-1 ObserveronNext(a)The thread is main ObserveronNext(a)The thread is main ObserveronComplete(a)The thread is mainCopy the code
As you can see from the above example:
Observer
Of the observeronSubscribe()
The method runs in the current thread.Observable
Of the observedsubscribe()
Running on thesubscribeOn()
In the specified thread.Observer
Of the observeronNext()
andonComplete()
Wait for the method to run inobserveOn()
In the specified thread.
5.2 Source code Analysis
Let’s analyze the source code for thread switching, which is divided into two parts: subscribeOn() and observeOn().
5.2.1 subscribeOn() source code analysis
First look at the subscribeOn() used in our example:
.subscribeOn(Schedulers.io())
Copy the code
The subscribeOn() method is passed as an argument an object of the Scheduler class, which is a scheduling class that can delay or periodically execute a task.
5.2.1.1 Scheduler type
The Schedulers class allows us to obtain the subclasses of various Schedulers. RxJava provides the following thread scheduling classes for us to use:
The Scheduler type | use | meaning | Usage scenarios |
---|---|---|---|
IoScheduler | Schedulers.io() |
IO operation thread | I/O intensive operations such as reading and writing SD card files, querying databases, and accessing networks |
NewThreadScheduler | Schedulers.newThread() |
Creating a new thread | Time-consuming operation, etc. |
SingleScheduler | Schedulers.single() |
Singleton thread | When only one singleton thread is needed |
ComputationScheduler | Schedulers.computation() |
The CPU computes the thread of operation | Image compression sampling, XML, JSON parsing and other CPU intensive computing |
TrampolineScheduler | Schedulers.trampoline() |
The current thread | When a task needs to be executed immediately on the current thread |
HandlerScheduler | AndroidSchedulers.mainThread() |
Android is the main thread | Update the UI etc. |
5.2.1.2 Schedulers类的io()
Schedulers.io() ¶ Let’s take a look at the schedulers.io () code. The other Scheduler subclasses are similar
@NonNull
static final Scheduler IO;
@NonNull
public static Scheduler io(a) {
//1. Return a Scheduler object named IO
return RxJavaPlugins.onIoScheduler(IO);
}
static {
// Omit irrelevant code
//2.IO objects are instantiated in static code blocks, where an IOTask() is created.
IO = RxJavaPlugins.initIoScheduler(new IOTask());
}
static final class IOTask implements Callable<Scheduler> {
@Override
public Scheduler call(a) throws Exception {
//3.IOTask returns an IoHolder object
returnIoHolder.DEFAULT; }}static final class IoHolder {
//4.IoHolder will be a new IoScheduler object
static final Scheduler DEFAULT = new IoScheduler();
}
Copy the code
As you can see, schedulers.io () uses a static inner class to create a singleton IoScheduler object that inherits from the Scheduler. This IoScheduler will be used later.
5.2.1.3 subscribeOn() of Observable
Then, let’s look at the subscribeOn() code:
public final Observable<T> subscribeOn(Scheduler scheduler) {
// Omit irrelevant code
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
Copy the code
As you can see, the current Observable (which is implemented as ObservableCreate) is first wrapped up as a new ObservableSubscribeOn object. Put a photo:
As before, rxJavaplugins.onAssembly () simply returns the ObservableSubscribeOn object as is, so I won’t look at it here. Take a look at the construction of ObservableSubscribeOn:
The constructor of the ObservableSubscribeOn class
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
Copy the code
I’m just going to save the source and the scheduler, which I’ll use later.
Then the subscribeOn() method is finished. It doesn’t seem to be doing anything, just rewrapping the object and returning the new object. To wrap an old observed into a new observed.
5.2.1.5 ObservableSubscribeOn class subscribeActual()
Now let’s go back to the subscription process. Why do we go back to the subscription process? Because the event is sent from the subscription process ah. Although we are using thread switching here, the subscription process is the same as the previous section, so we will not repeat it here and start at a different place. Remember that the subscribeActual() method of the Observable in the subscription process is abstract? So look at the implementation of the subclasses. In the subscription process in the previous section, the concrete implementation was in the ObservableCreate class. But since we called subscribeOn(), the ObservableCreate object is wrapped as a new ObservableSubscribeOn object. So let’s look at the subscribeActual() method in the ObservableSubscribeOn class:
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
Copy the code
The subscribeActual() also wraps our custom Observer into a new SubscribeOnObserver object. Again, here’s a picture:
Observer
onSubscribe()
Observer
onSubscribe()
SubscribeTask
scheduler.scheduleDirect()
SubscribeTask
5.2.1.6 SubscribeTask class
//SubscribeTask is the inner class of ObservableSubscribeOn
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run(a) {
// In this case, the source is the ObservableCreatesource.subscribe(parent); }}Copy the code
A simple class that implements the Runnable interface, then calls observer.subscribe () in run().
5.2.1.7 scheduleDirect() of Scheduler
Consider the scheduler.scheduleDirect() method
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
Copy the code
Look down:
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
//createWorker() is an abstract method in the Scheduler class, so its implementation is in its subclass
// So the createWorker() should be implemented in the IoScheduler.
// Runnable can be executed in Worker
final Worker w = createWorker();
// The actual decoratedRun is still the run object, which is the SubscribeTask
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
// Wrap the Runnable and Worker into a DisposeTask
DisposeTask task = new DisposeTask(decoratedRun, w);
//Worker executes the task
w.schedule(task, delay, unit);
return task;
}
Copy the code
Let’s look at the process of creating the Worker and the Worker performing the task.
5.2.1.8 IoScheduler createWorker() and Schedule ()
final AtomicReference<CachedWorkerPool> pool;
public Worker createWorker(a) {
// Create a new EventLoopWorker and pass a Worker cache pool to it
return new EventLoopWorker(pool.get());
}
static final class EventLoopWorker extends Scheduler.Worker {
private final CompositeDisposable tasks;
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;
final AtomicBoolean once = new AtomicBoolean();
// The constructor
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
// Retrieve a Worker from the cache Worker pool
this.threadWorker = pool.get();
}
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
// Omit irrelevant code
// The Runnable is handed over to the threadWorker
returnthreadWorker.scheduleActual(action, delayTime, unit, tasks); }}Copy the code
Note that different Scheduler classes have different Worker implementations because the Scheduler class is ultimately delivered to the Worker to perform scheduling.
Let’s look at the Worker cache pool in action:
5.2.1.9 CachedWorkerPool the get ()
static final class CachedWorkerPool implements Runnable {
ThreadWorker get(a) {
if (allWorkers.isDisposed()) {
return SHUTDOWN_THREAD_WORKER;
}
while(! expiringWorkerQueue.isEmpty()) {// If the buffer pool is not empty, the threadWorker is fetched from the buffer pool
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if(threadWorker ! =null) {
returnthreadWorker; }}// If the buffer pool is empty, create one and return it.
ThreadWorker w = new ThreadWorker(threadFactory);
allWorkers.add(w);
returnw; }}Copy the code
5.2.1.10 NewThreadWorker scheduleActual ()
We’ll look at threadWorker. ScheduleActual (). The ThreadWorker class does not implement scheduleActual(), but its parent class NewThreadWorker does.
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
volatile boolean disposed;
public NewThreadWorker(ThreadFactory threadFactory) {
// The constructor creates a ScheduledExecutorService object that uses thread pools
executor = SchedulerPoolFactory.create(threadFactory);
}
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
// The decoratedRun here is actually a run object
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
// Wrap the decoratedRun as a new object ScheduledRunnable
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
// Omit irrelevant code
if (delayTime <= 0) {
// The thread pool immediately executes ScheduledRunnable
f = executor.submit((Callable<Object>)sr);
} else {
// ScheduledRunnable is delayed in the thread pool
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
// Omit irrelevant code
returnsr; }}Copy the code
An executor uses a pool of threads to execute tasks, and the run() method of the SubscribeTask is executed in the pool, while the subscribe() method of the Observable is called in the IO thread. This is consistent with the output in the example above:
Observable subscribe(a)The thread is RxCachedThreadScheduler-1Copy the code
5.2.1.11 Summary
Observer
Of the observeronSubscribe()
The method runs in the current thread, because thread switching has not been involved before.- If I set it
SubscribeOn (specifies the thread)
, thenObservable
In (the observed)subscribe()
The method will run on the specified thread.
5.2.1.12 sequence diagram
Take the overall subscribeOn() switch thread timing diagram
5.2.1.13 subscribeOn() Is Set Multiple times
If we set the subscribeOn() multiple times, which thread of execution is it? Let’s do an example
// Omit the code before and after, look at the key parts
.subscribeOn(Schedulers.io())/ / for the first time
.subscribeOn(Schedulers.newThread())/ / the second time
.subscribeOn(AndroidSchedulers.mainThread())/ / the third time
Copy the code
The output is as follows:
Observable subscribe(a)The thread is RxCachedThreadScheduler-1Copy the code
That is, only the first subscribeOn() is active. Why is that? As we know, every time a call to subscribeOn() is made, the subscribeOn() wraps the old observer into a new observer, and after three calls, it looks like this:
ObservableSubscribeOn
subscribeOn(Schedulers.io())
subscribeOn()
5.2.2 observeOn ()
Let’s look at observeOn() again, again to review the Settings in our example:
// Specify execution in the Main Android thread
.observeOn(AndroidSchedulers.mainThread())
Copy the code
5.2.2.1 Observable observeOn()
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
// Omit irrelevant code
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
Copy the code
Note that the old ObservableSubscribeOn object is wrapped, and the subscribeOn() call is wrapped one layer further, so now it looks like this:
Rxjavaplugins.onassembly () is also returned as is.
Let’s look at the constructor for ObservableObserveOn.
Constructor for the ObservableObserveOn class
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
Copy the code
It’s just a bunch of variable assignments.
5.2.2.3 ObservableObserveOn subscribeActual ()
Much like subscribeOn(), let’s look directly at the subscribeActual() method of ObservableObserveOn.
@Override
protected void subscribeActual(Observer<? super T> observer) {
// Check whether the thread is current
if (scheduler instanceof TrampolineScheduler) {
// If the current thread calls the subscribe() method directly
// Call the subscribe() method of ObservableSubscribeOn
source.subscribe(observer);
} else {
/ / create the Worker
/ / in this case the scheduler to AndroidSchedulers mainThread ()
Scheduler.Worker w = scheduler.createWorker();
// This wraps the Worker into an ObserveOnObserver object
}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}
// In this example, source-subscribe is executed in the IO thread.
source.subscribe(newObserveOnObserver<T>(observer, w, delayError, bufferSize)); }}Copy the code
Again, the observer is wrapped in a layer, as shown in the figure below:
Source.subscribe () will send events one by one. Let’s just look at the onNext() method in the ObserveOnObserver, not onComplete(), etc.
5.2.2.4 ObserveOnObserver onNext ()
@Override
public void onNext(T t) {
// Omit irrelevant code
if(sourceMode ! = QueueDisposable.ASYNC) {// Store the information in the queue
queue.offer(t);
}
schedule();
}
Copy the code
It’s just calling schedule().
5.2.2.5 ObserveOnObserver schedule ()
void schedule(a) {
if (getAndIncrement() == 0) {
//ObserveOnObserver also implements the Runnable interface, so it's up to the worker to schedule it
worker.schedule(this); }}Copy the code
The main thread scheduler in Android will not be analyzed. It is actually implemented using a handler to send messages. Since ObserveOnObserver implements the Runnable interface, its run() method is called in the main thread. Let’s look at the run() method of ObserveOnObserver:
5.2.2.6 ObserveOnObserver run ()
@Override
public void run(a) {
//outputFused is false by default
if (outputFused) {
drainFused();
} else{ drainNormal(); }}Copy the code
Here’s the drainNormal() method.
5.2.2.7 ObserveOnObserver drainNormal ()
void drainNormal(a) {
int missed = 1;
// Queue to store messages
final SimpleQueue<T> q = queue;
// The actual is actually a SubscribeOnObserver
final Observer<? super T> a = actual;
// Omit irrelevant code
// Fetch the message from the queue
v = q.poll();
/ /...
// The onNext() method is called inside the layer
/ / in this case, is called SubscribeOnObserver. OnNext ()
a.onNext(v);
/ /...
}
Copy the code
As for SubscribeOnObserver. OnNext (), also didn’t switch threads inside logic, is invoked inside a layer of onNext (), so you will call to our custom in the Observer onNext () method. Thus, the onNext() method of the Observer is called in the thread specified in observeOn(), in this case, in the main Android thread.
5.2.2.8 Brief summary
- If I set it
ObserveOn (specify thread)
, thenObserver
(observer)onNext()
,onComplete()
And so on will run in the specified thread.subscribeOn()
The set thread will not be affectedobserveOn()
.
5.2.2.9 sequence diagram
Finally, a sequence of observeOn() :
6. Other
Due to my limited level, if there is any mistake, welcome to point out and communicate ~ April grape’s blog