The source code parsing
create Observable
Observable
.fromArray(1.2.3.4)
.subscribe (object:Observer<Int> {override fun onSubscribe(d: Disposable){}override fun onNext(t: Int){}override fun onError(e: Throwable){}override fun onComplete(a){}})Copy the code
First look at Observable.fromArray(1, 2, 3, 4) and enter the Observable class
class Observable{
public static <T> Observable<T> fromArray(T... items) {
/ / found empty
ObjectHelper.requireNonNull(items, "items is null");
// Obviously, with some optimizations, zero elements and one element is much simpler.
if (items.length == 0) {
return empty();
} else
if (items.length == 1) {
return just(items[0]);
}
//1.RxJavaPlugins.onAssembly()
//Call the associated hook function.
// Well, you can associate some external logs and monitor things. That doesn't affect the main flow.
// RxJavaPlugins are omitted for the rest of the analysis.
//2. ObservableFromArray is created, so this is the next core.
return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
}
@Override
public final void subscribe(Observer<? superT> observer) { subscribeActual(observer); }}Copy the code
Look at ObservableFromArray code, plain and simple! ObservableFromArray inherits Observables. The subclass’s subscribeActual() is called by Observable.subscribe(). A lot of the logic actually happens here. The analytical perspective is often here as well.
Fusionmode-related codes have been deleted. We cannot follow the logic here, which will be analyzed in the following chapters.
class ObservableFromArray<T> extends Observable<T> {
final T[] array;
public ObservableFromArray(T[] array) { this.array = array; }
@Override
public void subscribeActual(Observer<? super T> s) {
FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);
s.onSubscribe(d);
d.run();
}
static final class FromArrayDisposable<T> implements Disposable {
final Observer<? super T> actual;
final T[] array;
volatile boolean disposed;
FromArrayDisposable(Observer<? super T> actual, T[] array) {
this.actual = actual;
this.array = array;
}
void run(a) {
T[] a = array;
int n = a.length;
for (int i = 0; i < n && ! isDisposed(); i++) { T value = a[i];if (value == null) {
actual.onError(new NullPointerException("The " + i + "th element is null"));
return;
}
actual.onNext(value);
}
if(! isDisposed()) actual.onComplete(); }@Override
public void dispose(a) {
disposed = true;
}
@Override
public boolean isDisposed(a) {
returndisposed; }}}Copy the code
With regard to Disposable, RxJava provides us with a switch that can be used to unsubscribe. This is passed around the code like a tracker, and it becomes unrecognizable as you read the source code. So let’s look at the forward function.
map
Observable.fromArray(1.2.3.4)
.map { it * 5 }
.subscribe { println(it) }
Copy the code
From here, the code seems to get complicated. Because RxJava has multiple operators, reuse logic. There’s a lot of abstraction and encapsulation. When ObservableMap. SubscribeActual (), the source. The subscribe (MapObserver (yourObserver)), MapObserver is upstream of the Observer. We also passed mapper function, IT * 5, to MapObserver. Call order: –>FromArrayDisposable. Run () –> mapObserver.onNext (value) –> YourobServer.onNext (mapper.apply(value))
abstract class AbstractObservableWithUpstream<T.U> extends Observable<U> {
protected final ObservableSource<T> source;
}
public final class ObservableMap<T.U> extends AbstractObservableWithUpstream<T.U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
static final class MapObserver<T.U> extends BasicFuseableObserver<T.U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) { U v = mapper.apply(t); actual.onNext(v); }}}Copy the code
BasicFuseableObserver removes the FusionMode part, which is still simple. MapObserver(i.e., BasicFuseableObserver), contains the upstream Disposable and the downstream Observer.
/**
* Base class for a fuseable intermediate observer.
* @param <T> the upstream value type
* @param <R> the downstream value type
*/
public abstract class BasicFuseableObserver<T.R> implements Observer<T>, QueueDisposable<R> {
/** The downstream subscriber. */
protected final Observer<? super R> actual;
/** The upstream subscription. */
protected Disposable s;
public BasicFuseableObserver(Observer<? super R> actual) {
this.actual = actual;
}
@Override
public final void onSubscribe(Disposable s) {
this.s = s;
actual.onSubscribe(this); }}@Override
public void onError(Throwable t) { actual.onError(t); }
@Override
public void onComplete(a) { actual.onComplete(); }@Override
public void dispose(a) {s.dispose(); }@Override
public boolean isDisposed(a) {returns.isDisposed(); }}Copy the code
You can see that map’s own MapObserver subscribes to the upstream Observable.
subscribeOn
Recall the code mentioned in the previous section
println("in main:${Thread.currentThread()}")
Observable.create<Int> {
println("in create:${Thread.currentThread()}");
it.onNext(1) }
.subscribeOn(Schedulers.newThread())
.subscribe { println("in next :${Thread.currentThread()} $it")}// Run the result
in main:Thread[main,5,main]
in create:Thread[RxNewThreadScheduler-1.5,main]
in next :Thread[RxNewThreadScheduler-1.5,main] 5
Copy the code
Schedulers.newthread () creates the NewThreadScheduler. Scheduler content is not the focus of this section. NewThreadScheduler. ScheduleDirect (Runnable) ultimately calls the ExecutorService. Submit (Runnable). The runnable is thrown into the thread pool for execution. The runnable here is SubscribeTask. Execute source.subscribe(SubscribeOnObserver) in the new thread. If there is no related thread switch operation upstream. The entire execution is switched from the main thread to the new thread. The entire chain process is equivalent to executorService.submit {source. Subscribe (SubscribeOnObserver)}. Look at SubscribeOnObserver, which contains yourObserver.
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T.T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
final Observer<? super T> actual;
final AtomicReference<Disposable> s;
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable s) { DisposableHelper.setOnce(this.s, s); }
@Override
public void onNext(T t) {actual.onNext(t); }@Override
public void onError(Throwable t) {actual.onError(t); }@Override
public void onComplete(a) {actual.onComplete();}
}
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
@Override
public void run(a) {source.subscribe(parent); }}}Copy the code
�subscibeOn can understand this
observeOn
println("in main:${Thread.currentThread()}")
Observable.create<Int> {
println("in create:${Thread.currentThread()}");
it.onNext(1) }
.observeOn(Schedulers.newThread())
.subscribe { println("in next :${Thread.currentThread()} $it")}// Run the result
in main:Thread[main,5,main]
in create:Thread[main,5,main]
in next :Thread[RxNewThreadScheduler-1.5,main] 5
Copy the code
Let’s look at the ObservableObserveOn code, which I’ve trimmed a lot from the source code. But regardless of external cancellations or internal exceptions, this is exactly how it works. After deleting the Fusion code, ObservableCreate is still not needed.
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T.T> {
// The actual running thread is dependent
final Scheduler scheduler;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
Scheduler.Worker w = scheduler.createWorker();
//subscribe() is still executed on the source thread, and subscribeOn throws the entire source.subscribe into the new thread.
Scheduler and yourObserver are passed to the new ObserveOnObserver.
source.subscribe(new ObserveOnObserver<T>(observer, w));
}
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
final Observer<? super T> actual;
final Scheduler.Worker worker;
SimpleQueue<T> queue;
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker) {
this.actual = actual;
this.worker = worker;
}
@Override
public void onSubscribe(Disposable s) {
SpscLinkedArrayQueue SpscLinkedArrayQueue SpscLinkedArrayQueue SpscLinkedArrayQueue SpscLinkedArrayQueue
queue = new SpscLinkedArrayQueue<T>(bufferSize);
actual.onSubscribe(this);
}
// Execute run() in the Scheduler thread, that is, drainNormal(). We find that upstream and downstream are not on the same thread.
// We assume that the rate of production is higher than the rate of consumption.
@Override
public void onNext(T t) {
// Put thread shared queue, production behavior
queue.offer(t);
schedule();
}
void schedule(a) {
// When first called, the get value is 0 and +1, which is understood as the number of times to process
// If the get value is 0, the consumption behavior is triggered, otherwise it is not triggered.
if (getAndIncrement() == 0) worker.schedule(this);
}
@Override
public void run(a) { drainNormal(); }
We need to drain the water from the pipes.
void drainNormal(a) {
// Can enter drain with at least one data.
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
// The first time I saw this for-for, I was also confused. This is actually a code model that deals with producer-consumer issues.
for (;;) {
for (;;) {
//1. Look at the inner loop first, q is SpscLinkedArrayQueue, is not blocking.
// Empty the data.
T v = q.poll();
if (v == null) break;
a.onNext(v);//yourObserver.onNext
}
//2. Remember the above assumption that production rate > consumption rate.
GetAndIncrement ()! =0, cannot enter the consuming thread.
// Missed records the number of requests to be processed for the current inner loop.
// Update missed values, current number of requests - number of requests processed in the last round.
missed = addAndGet(-missed);
if (missed == 0) break;
//3. Note that the number of requests is equal to the number of data. But in this model, it's not mandatory.}}... }}Copy the code
For observeOn, the bold thing is that only yourObserver ends up running in a new thread.
This is the first time you see the drain code. Obviously, you can fix this by locking. But here’s oneThe wip skills(Working-in-progress) using CAS(Compare And Set) to solve the problem. In RxJava2, multithreading is usually done in this way.
Scheduler
From above, we know that Scheduler basically throws code scenarios into another thread to run. Scheduler manages threads through a Java thread pool.
The core code
Here is a piece of pseudocode
public abstract class Scheduler {
public abstract Worker createWorker(a);
public Disposable scheduleDirect(@NonNull Runnable run) {
createWorker().schedule(run);
}
public abstract static class Worker implements Disposable {
ExecutorService executor;
public Disposable schedule(@NonNull Runnable run) {
Task task = new Task(run);
executor.submit(task)
}
}
// In RxJava, a Task might be called ScheduledRunnable or ScheduledDirectTask
public static class Task implements Callable<Void>,Disposable{
private Runnable run;
@Override
public Void call(a) {run.run();return null; }@Override
public void dispose(a) {... }@Override
public boolean isDisposed(a) {... }}}Copy the code
The actual call looks like this.
The scheduler. The worker. The schedule (Runnable code} {need to throw into the new thread) executor. Submit (Task (Runnable))Copy the code
About the Task
In RxJava, a Task might be called ScheduledRunnable or ScheduledDirectTask, etc. Task wraps our runnable and provides control over runnable. In ObserveOnObserver, each next execution produces a task. In the ObservableSubcribeOn source.subcribe, there is only one task because it is a one-off task. Tasks in each Worker are executed sequentially.
About the Scheduler
Schedulers provide Schedulers for different scenarios:
- Schedulers.immediate()
By default, no thread is specified
- Schedulers.newThread():
Executing on a new thread, there really is only one thread. Each worker has a separate thread pool.
- Schedulers.computation():
The maximum number of threads that apply to CPU computation operations is the number of JVM processors. Pregenerate a fixed number of poolworkers. Different scenarios share poolworkers.
- Schedulers. IO () :
There is no upper limit to the number of threads that can operate on IO. There is a CachedWorkerPool where the actual worker can be reused.
- Schedulers.trampoline():
The current thread executes, not immediately, until the previous task completes. The current task is executed in the team.
- Schedulers.single():
All worker thread pools created by Single are common compared to schedulers.newthread ().
- AndroidSchedulers.mainThread()
Executed on the Android main thread, internally via Handler, and only by Handler. About RxAndroid.