This article was written by JasonChen
Chblog. Me / 2018/12/19 /…
In the previous article, we covered the internal design patterns and mechanisms of RxJava2, including the Observer pattern and decorator pattern, which are essentially RxJava2 event driven. This article will cover another important feature of RxJava2: asynchronism.
RxJava2 in-depth parsing
Scheduler.worker w = scheduler.createWorker() There are two important components involved in the subcribeOn method:
- The scheduler scheduler
- Custom thread pools
Scheduler source code parsing
public final class Schedulers {
@NonNull
static final Scheduler SINGLE;
@NonNull
static final Scheduler COMPUTATION;
@NonNull
static final Scheduler IO;
@NonNull
static final Scheduler TRAMPOLINE;
@NonNull
static final Scheduler NEW_THREAD;
Copy the code
There are five types of schedulers as follows, which correspond to different scenarios. Of course, enterprises can set their own schedulers according to their own scenarios.
- SINGLE, a SINGLE timed thread pool for a SINGLE task
- COMPUTATION, resource pools for timed thread pools set up for computing tasks (arrays)
- IO, a single reusable timed thread pool set for IO tasks
- TRAMPOLINE, TRAMPOLINE translation is TRAMPOLINE (admire author’s imagination). The scheduler’s source code comment reads: Tasks work on the current thread (not the thread pool) but are not executed immediately; tasks are queued and executed after the current task completes. To put it simply, it is to join the team and then slowly implement it linearly (the ingenious method here is basically the same as the backpressure implementation mechanism mentioned above, which is worth learning from).
- NEW_THREAD, a single periodic thread pool, is basically the same as single, except that single implements a simple NonBlocking wrapper for threads. This wrapper is basically useless from the source code, and is just a marker interface
Computation scheduler source code analysis
Well, how does the Computation scheduler work for a lot of computing scenarios, and it’s going to be used more in back-end concurrent scenarios? Next, source code analysis with doubts.
Public final class ComputationScheduler extends the Scheduler implements SchedulerMultiWorkerSupport {/ / final resource pool AtomicReference<FixedSchedulerPool> pool; Public Worker createWorker() {// Create an EventLoop Worker, PoolWorker return new EventLoopWorker(pool.get().geteventloop ())); } static final class FixedSchedulerPool implements SchedulerMultiWorkerSupport { final int cores; // Final PoolWorker[] eventWorker loops; long n; Public PoolWorker getEventLoop() {int c = cores; if (c == 0) { return SHUTDOWN_WORKER; } // Simple round robin, improvements to come Return eventLoops[(int)(n++ % c)]; eventLoops[(int)(n++ % c)]; Static final class PoolWorker extends NewThreadWorker {PoolWorker(ThreadFactory ThreadFactory) { super(threadFactory); } } public class NewThreadWorker extends Scheduler.Worker implements Disposable { private final ScheduledExecutorService executor; volatile boolean disposed; Public NewThreadWorker(ThreadFactory ThreadFactory) {// Initializes the timed thread pool executor = SchedulerPoolFactory.create(threadFactory); } public static ScheduledExecutorService create(ThreadFactory factory) { final ScheduledExecutorService exec = // Initialize a thread pool regularly Executors. NewScheduledThreadPool (1, factory); tryPutIntoPool(PURGE_ENABLED, exec); return exec; }Copy the code
The code above clearly shows the computation of the scheduler implementation details, to note here is that the timing of the thread pool core is set to 1, the number of thread pool for a maximum of CPU number, here involves ScheduledThreadPoolExecutor timing the principle of the thread pool, It is simply an array (queue) that grows automatically, similar to an ArrayList, which means that the queue is never full and the number of threads in the thread pool does not grow.
Next, we will analyze the nature of how the subscribing thread and the publishing thread communicate with each other.
As mentioned in the previous article, the publish thread is an internal worker, so is the subscribe thread, which obviously must be. Let’s take a look at the source code:
Public void subscribeActual(Subscriber<? super T> s) { Worker worker = scheduler.createWorker(); if (s instanceof ConditionalSubscriber) { source.subscribe(new ObserveOnConditionalSubscriber<T>( (ConditionalSubscriber<? super T>) s, worker, delayError, prefetch)); } else { // source.subscribe(new ObserveOnSubscriber<T>(s, worker, delayError, prefetch)); }}Copy the code
It encapsulates an ObserveOnsubcriber inside, which is a sort of encapsulation of the indecent subscriber, what’s the main function, why do you want this? In fact, this involves the internal mechanism of the subscribing thread, then look at the source code to understand its internal mechanism.
/ / the abstract base class static class BaseObserveOnSubscriber < T > extends BasicIntQueueSubscription < T > implements FlowableSubscriber<T>, Runnable { private static final long serialVersionUID = -8241002408341274697L; final Worker worker; final boolean delayError; final int prefetch; / /... @Override public final void onNext(T t) { if (done) { return; } if (sourceMode == ASYNC) { trySchedule(); return; } if (! queue.offer(t)) { upstream.cancel(); error = new MissingBackpressureException("Queue is full? !" ); done = true; } // enable subscriber thread pool mode scheduling, implement trySchedule() in subclass; } @Override public final void onError(Throwable t) { if (done) { RxJavaPlugins.onError(t); return; } error = t; done = true; trySchedule(); } @Override public final void onComplete() { if (! done) { done = true; trySchedule(); }} // The request is not passed up, But for the request count themselves as data launchers @ Override public final void request (n) {if (SubscriptionHelper. Validate (n)) { BackpressureHelper.add(requested, n); // start trySchedule(); }} final void trySchedule() {if (getAndIncrement()! = 0) { return; } this implements the runable interface worker.schedule(this); Override public final void run() {if (outputFused) {runBackfused(); } else if (sourceMode == SYNC) { runSync(); } else {// The runAsync method is called runAsync(); } } abstract void runBackfused(); abstract void runSync(); abstract void runAsync(); / /... }Copy the code
When an upstream decorator (the decorator pattern mentioned in the previous article) calls the onNext method, there is no similar call to the downstream onNext method. This is the core principle of the subscriber thread pattern: store data in a queue, where data is tried to be queued.
The specific implementation class of ObserveOnSubscriber is partially implemented as follows.
static final class ObserveOnSubscriber<T> extends BaseObserveOnSubscriber<T> implements FlowableSubscriber<T> { private static final long serialVersionUID = -4547113800637756442L; final Subscriber<? super T> downstream; ObserveOnSubscriber( Subscriber<? super T> actual, Worker worker, boolean delayError, int prefetch) { super(worker, delayError, prefetch); this.downstream = actual; } // This is the method to call when upstream calls this subscriber, See an article on @ Override public void onSubscribe Subscription (s) {if (SubscriptionHelper. Validate (enclosing upstream, s)) { this.upstream = s; if (s instanceof QueueSubscription) { @SuppressWarnings("unchecked") QueueSubscription<T> f = (QueueSubscription<T>) s; int m = f.requestFusion(ANY | BOUNDARY); if (m == SYNC) { sourceMode = SYNC; queue = f; done = true; downstream.onSubscribe(this); return; } else if (m == ASYNC) { sourceMode = ASYNC; queue = f; downstream.onSubscribe(this); s.request(prefetch); return; Queue = new SpscArrayQueue<T>(prefetch); // Trigger the downstream subscriber. If there is a request, the downstream request to the upstream data is triggered. OnSubscribe (this); Prefetch (s.equest); prefetch (s.equest); }} / /...Copy the code
Let’s look at the implementation of the abstract method runAsync().
@Override void runAsync() { int missed = 1; final Subscriber<? super T> a = downstream; final SimpleQueue<T> q = queue; long e = produced; for (;;) { long r = requested.get(); while (e ! = r) { boolean d = done; T v; V = q.pol (); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); cancelled = true; upstream.cancel(); q.clear(); a.onError(ex); worker.dispose(); return; } boolean empty = v == null; if (checkTerminated(d, empty, a)) { return; } if (empty) { break; } a.onNext(v); e++; // prefetch = prefetch - (prefetch >> 2) // prefetch = BUFFER_SIZE = Long.MAX_VALUE) { r = requested.addAndGet(-e); } upstream.request(e); e = 0L; } } if (e == r && checkTerminated(done, q.isEmpty(), a)) { return; } int w = get(); int w = get(); if (missed == w) { produced = e; missed = addAndGet(-missed); if (missed == 0) { break; } } else { missed = w; }}} / /... }Copy the code
As stated earlier, the subscriber regards himself as a transmitter, where does the number/data come from, and continues to have data, so the following code indicates the data source. When the data reaches the limit, a prefetch of the new data begins, and the number of preftch each time is the limit.
Why set the subscribers to this difference, the reason is very simple actually, subscribers and publishers need to asynchronously execute different thread mechanism, such as computation mechanism of thread that subscribers will need to a lot of time-consuming data calculation, but also maintain a consistent pattern decoration, so the source is the subscriber side break the callback call flow, Data queue is used to transfer data between two thread pools.
This paper summarizes
The author likes to summarize, which means we reflect on and learn the previous knowledge points, application points and their own shortcomings.
- Rxjava2 thread scheduling mechanism. Thread mechanism needs to be customized in different scenarios
- Rxjava2 production and consumption of asynchronous principle and implementation