What is RXJava?
ReactiveX
ReactiveX is an acronym for Reactive Extensions, commonly known as Rx. Originally developed as an extension to LINQ and opened in November 2012 by a team led by Microsoft architect Erik Meijer, ReactiveX is a programming model that aims to provide a consistent programming interface. To help developers more easily handle asynchronous data flow, Rx library support. NET, JavaScript, and C++, Rx has become increasingly popular in recent years and now supports almost all popular programming languages. Most of Rx’s libraries are maintained by the ReactiveX organization, with the most popular being rxjava/RXJ/rx.net.
rxjava
Rxjava is an implementation of ReactiveX on the Java platform. Is a programming model that provides chained interface calls in observer mode, dynamically controlling thread switching, making it easy to handle asynchronous data flows.
Introduction to the
Github:rxjava
Chinese version: ReactiveX/RxJava
Liverpoolfc.tv: reactivex
The characteristics of
- Chain call, easy to use
- To simplify the logic
- Flexible thread scheduling
- Provide perfect data operator, powerful function
Observer model
The observer pattern defines a one-to-many dependency between objects so that whenever an object changes state, all dependent objects are notified and automatically updated. At the heart of rXJava’s design is the observer pattern. An Observable is the observed, and an Observer is the Observer. It subscribes via the subscribe method.
- advantages
The observer and the observed are abstractly decoupled in response to business changes
Enhance system flexibility and scalability
Refer to the Design pattern-Observer pattern for code examples
- disadvantages
Need to consider the development efficiency when using the observer pattern and efficiency problem, the program includes a observed, more than one observer, development and debugging, etc will be more complex, and the notice of default is in Java message order, an observer caton, will affect the overall execution efficiency, and in this case, generally consider using an asynchronous manner
How to use RXJava?
Gradle introduced version
implementation 'the IO. Reactivex. Rxjava2: rxjava: 2.2.0'
implementation 'the IO. Reactivex. Rxjava2: rxandroid: 2.1.0'
Copy the code
To take a common example of rXJava usage, we often need to request a server-side interface in a project, then fetch the data, cache the data, and then handle the display on the UI. The code for this example is as follows:
Observable.create(new ObservableOnSubscribe<Response>() { @Override public void subscribe(ObservableEmitter<Response> e) Throws Exception {// Obtain the interface data of the server. Request.builder Builder = new request.builder ().url("http://xxx.com") .get(); Request request = builder.build(); Call call = new OkHttpClient().newCall(request); Response response = call.execute(); e.onNext(response); } }).map(new Function<Response, Model>() {@override public Model apply(Response Response) throws Exception {// Convert JSON data to the corresponding Modelif (response.isSuccessful()) {
ResponseBody body = response.body();
if(body ! = null) { Log.e(TAG,"Map: before conversion :" + response.body());
returnnew Gson().fromJson(body.string(), Model.class); }}returnnull; }}). DoOnNext (new Consumer<Model>() {@override public void Accept (Model s) throws Exception Log.e(TAG,"DoOnNext: Save network loaded data:" + s.toString() + "\n"); } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Model>() { @override public void accept(Model Model) throws Exception {// Refresh UI log. e(TAG,"Successfully refresh interface :" + data.toString() + "\n"); }}, New Consumer<Throwable>() {@override public void accept(Throwable Throwable) throws Exception {// Error message log.e (TAG,"Failed to handle exception:" + throwable.getMessage() + "\n"); }});Copy the code
This article mainly analyzes the source code of RXJava. For the use of rXJava operators, it is recommended to refer to the Chinese documentation, as well as the following blog introduction.
This is probably the best RxJava 2.x tutorial (final version)
What is the rXJava core execution flow?
Rxjava is designed in Observer mode. New Observables and Observers are generated when operators are executed. Observables hold upstream objects, and observers hold downstream observers. When the subscribe method is executed, the subscribe method is executed upstream by holding the upstream observed object. When executing ObservableEmitter’s onNext method to the original ObservableEmitter callback method, since the Observer holds the downstream Observer object, the ObservableEmitter onNext method will be progressively called until the incoming Observer instance is subscribed. This is the core execution flow of RXJava chain calls.
Of course, RXJava also involves scheduling of threads and back-pressure processing of data. The implementation principles of these will be discussed later. But the core execution flow of rXJava’s chain calls is the same. We’ll take a look at the core execution flow of RXJava in two parts, including a description of some key classes, and through sample code-related execution flow charts.
Description of key class functions
class | instructions |
---|---|
ObservableSource | Interface class with a single SUBSCRIBE method that takes an Observer object |
Observer | Interface class, observer. There are onSubscribe, onNext, onError, onComplete methods |
Consumer | The interface has the observer. There is only one Accept method, which will eventually be converted to an Observer when subscribed. This class is designed to simplify calls |
Observable | The abstract class inherits from the ObservableSource interface, from which all implementations of operators inherit. Internally encapsulates a large number of operator call methods, mainly there is a core abstract method abstract void subscribeActual(Observer<? Super T> observer) for implementing the associated subscription distribution logic. |
AbstractObservableWithUpstream | Inherits from Observable, the constructor needs to pass in the ObservableSource source object, which is the parent observed. |
ObservableCreate | Inheritance in AbstractObservableWithUpstream, source for ObservableOnSubscribe. The subscribeActual method instantiates a CreateEmitter object that executes the Subscribe method of ObservableOnSubscribe |
ObservableMap | Inheritance in AbstractObservableWithUpstream, MapObserver subscription will make a new observer |
MapObserver | An internal class of ObservableMap, the onNext method triggers the mapper.apply(t) callback and then executes the downstream observer’s onNext method |
ObservableDoOnEach | Inheritance in AbstractObservableWithUpstream, DoOnEachObserver subscription will make a new observer |
DoOnEachObserver | ObservableDoOnEach’s inner class, onNext will execute the onNext.Accept (t) method and then the onNext method of the downstream observer |
ObservableSubscribeOn | The inheritance in AbstractObservableWithUpstream, thread scheduling control observed. ScheduleDirect (new SubscribeTask(parent)) is executed by subscribeActual, and source.subscribe(parent) is executed by SubscribeTask’s Run method. The ObservableSubscribeOn implements the upstream subscription method implementation according to the thread scheduler’s policy. |
ObservableObserveOn | Thread scheduling control inheritance in AbstractObservableWithUpstream, observer. The subscribeActual method determines whether the Scheduler is trampoline escheduler. If the downstream observer is executed, a new ObserveOnObserver is created and passed to schedule’s work. |
ObserveOnObserver | The ObservableObserveOn inner class, onNext, triggers the execution of the Schedule () method to control the callback thread of the downstream observer based on the worker |
Code execution flow
First of all, according to the above demo example, we comb out the simple implementation process of RXJava, as shown below:
According to the flowchart, RXJava generates new Observables and Observers when executing related operators. Observables hold upstream objects, and observers hold downstream observers. When the subscribe method is executed, the subscribe method is executed upstream by holding the upstream observed object. When executing ObservableEmitter’s onNext method to the original ObservableEmitter callback method, since the Observer holds the downstream Observer object, the ObservableEmitter onNext method will be progressively called until the incoming Observer instance is subscribed.
With an understanding of rXJava’s general execution process, let’s take a detailed look at the source code execution process. First or first on the overall flow chart, because the picture is larger, it is recommended to view the above demo and RXJava source code.
Let’s look at the implementation of rXJava source code in detail by assigning a few operators.
create
The create operator returns the observed of an ObservableCreate.
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source."source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
Copy the code
The key implementation code for the ObservableCreate object is as follows:
// The constructor passes in a reference to the ObservableOnSubscribe interface, specified as the observedsource. public ObservableCreate(ObservableOnSubscribe<T>source) {
this.source = source; // Override protected void subscribeActual(Observer<? CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); // Execute the subscribe callback of ObservableOnSubscribe, passing the CreateEmitter object source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); }}Copy the code
ObservableEmitter onNext: ObservableEmitter onNext: ObservableEmitter onNext: ObservableEmitter onNext
CreateEmitter(Observer<? super T> observer) { this.observer = observer; } @Override public void onNext(T t) {if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return; } // If the subscription is not unsubscribed, the downstream observer's onNext method is executed to achieve the effect of a chain callif (!isDisposed()) {
observer.onNext(t);
}
}
Copy the code
map
The map operator returns the observed of an ObservableMap.
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
Copy the code
Let’s look at the key implementation code for the ObservableMap object, as follows:
@Override public void subscribeActual(Observer<? Source. Subscribe (new MapObserver< t, U>(t,function));
}
Copy the code
ObservableEmitter’s onNext method is called when ObservableEmitter’s onNext method is fired.
Mapper MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) { super(actual); this.mapper = mapper; } @Override public void onNext(T t) {if (done) {
return;
}
if (sourceMode ! = NONE) { actual.onNext(null);return; } U v; Try {/ / the map at the core of the executing code, mapper. Apply (t) will perform data transformation, and the results after conversion v continue execution to the downstream of the observer v = ObjectHelper... requireNonNull (mapper. Apply (t),"The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return; } // Execute actual. OnNext (v); }Copy the code
doOnNext
The doOnNext operator returns the observed of an ObservableDoOnEach.
private Observable<T> doOnEach(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Action onAfterTerminate) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onAfterTerminate, "onAfterTerminate is null");
return RxJavaPlugins.onAssembly(new ObservableDoOnEach<T>(this, onNext, onError, onComplete, onAfterTerminate));
}
Copy the code
Let’s look at the key implementation code for the ObservableDoOnEach object, as follows:
@Override public void subscribeActual(Observer<? Super T> T) {// instantiate a DoOnEachObserver observer object source.subscribe(new DoOnEachObserver<T>(T, onNext, onError, onComplete, onAfterTerminate)); }Copy the code
Here, we still need to see the data processing of onNext of DoOnEachObserver, as follows:
@Override
public void onNext(T t) {
if (done) {
return; } try {// Call onNext. Accept (t); } catch (Throwable e) { Exceptions.throwIfFatal(e); s.dispose(); onError(e);return; } // Continue downstream to call the observer's onNext actual. OnNext (t); }Copy the code
subscribeOn
The subscribeOn operator returns the observed of an ObservableSubscribeOn and passes in the Scheduler thread scheduling parameter.
@CheckReturnValue @SchedulerSupport(SchedulerSupport.CUSTOM) public final Observable<T> subscribeOn(Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler,"scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
Copy the code
The key implementation code for the ObservableSubscribeOn object is as follows:
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source); this.scheduler = scheduler; } @Override public void subscribeActual(final Observer<? Super T> s) {// Final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); s.onSubscribe(parent); // This is the core method, which calls the scheduleDirect method, And pass in the SubscribeTask task parent-setDisposable (scheduler.scheduleDirect(new SubscribeTask(parent))); }Copy the code
Let’s look at the SubscribeTask implementation as follows:
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {// Execute the upstream observed subscription method, which is the core source. Subscribe (parent); }}Copy the code
Finally, look at the onNext method of SubscribeOnObserver, which is relatively simple and executes the onNext method of downstream observer directly as follows:
@Override
public void onNext(T t) {
actual.onNext(t);
}
Copy the code
The specific implementation of scheduler will be analyzed in the following thread principle. All we need to know is that the upstream observed subscription is executed in the specified Scheduler thread policy.
observerOn
The observerOn operator returns the observed of an ObservableObserveOn and passes in the Scheduler thread scheduling parameter.
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
Copy the code
Let’s look at the key implementation code for the ObservableObserveOn object, as follows:
@Override protected void subscribeActual(Observer<? Super T> observer) {//TrampolineScheduler will subscribe the downstream observer directly to the upstream observer if it is the current threadif (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} elseWorker. Worker w = scheduler.createworker (); Source. subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); }}Copy the code
The next key is to look at the ObserveOnObserver implementation as follows:
@override public void onNext(T T) {// If the mode is not asynchronous, join the queueif (sourceMode ! = QueueDisposable.ASYNC) { queue.offer(t); } // schedule(); } voidschedule() {// Determine the number of tasks currently being executedif(getAndIncrement() == 0) { worker.schedule(this); }}Copy the code
ObserveOnObserver itself inherits the Runnable interface. The run method is implemented as follows:
@Override
public void run() {// Whether the output results are fusedif (outputFused) {
drainFused();
} else{ drainNormal(); }}Copy the code
Let’s start with the drainNormal method:
void drainNormal() { int missed = 1; final SimpleQueue<T> q = queue; final Observer<? super T> a = actual; // Loop the first layerfor(;;) {// Check exception handlingif (checkTerminated(done, q.isEmpty(), a)) {
return; } // The second loopfor (;;) {
boolean d = done; T v; V = q.pll (); boolean empty = v == null; // Check the exceptionif (checkTerminated(d, empty, a)) {
return; } // If there is no more data, exitif (empty) {
break; } // Perform the next operation. a.onNext(v); }}}}}}}}}}}}}}}}}if (missed == 0) {
break; }}}Copy the code
The specific implementation of scheduler will be analyzed in the following thread principle. All we need to know is that the downstream observer’s onNext is executed in the specified Scheduler thread policy.
subscribe
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; }}Copy the code
The last subscription method, after a non-null check, calls the subscribeActual method and starts executing the subscription layer by layer upstream.
How does an Observable send data?
Through the above process analysis, we can know. If an Observable is created using create, ObservableOnSubscribe sends data from ObservableEmitter’s onNext in its Subscribe method, and onNext triggers the ObservableEmitter to start sending data to downstream observers. Of course, there are many other rXJava creation operators, such as just, from, etc., essentially trigger the downstream observer onNext to send data.
How does the Observer receive data?
Through source code analysis, each chain layer Observer holds adjacent downstream observers, and when data is sent, the Observer onNext method is chain-executed, and finally the Observer created in the SUBSCRIBE method is executed.
How is the subscription implemented between the observed and the observer?
Every Observable at the chain layer will hold adjacent upstream Observable objects. After the subscribe method is called, it will finally be executed to the subscribeActual method, in which the observer and the upstream observed are subscribed.
How does RXJava schedule threads?
There are many implementations of Scheduler in RXJava. Then through the most commonly used. SubscribeOn (Schedulers. IO ()). ObserveOn (AndroidSchedulers. MainThread ()) to analyze the specific thread scheduling process.
Scheduler
When we call subscribeOn and observeOn, we will pass in the Scheduler object. First, we will take a look at the types and functions of Scheduler
The Scheduler types | instructions |
---|---|
Schedulers. IO ( ) | SD Calvin for IO intensive operation, such as, speaking, reading and writing, query the database, access to the network, etc., with a thread cache mechanism, after this scheduler receives the task, first check the thread cache pool, if there is a free thread, if you have, the reuse, without creating new threads, and add to the thread pool, if there is no idle thread used every time, New threads can be created without an upper limit |
Schedulers. NewThread ( ) | NewThread ( ) can be used wherever schedulers.io ( ) is used. However, the schedulers.newthread ( ) can be used. Schedulers.newthread ( ) is not as efficient as schedulers.io ( ) |
Schedulers.computation() | For CPU intensive computing tasks, that is, time-consuming operations that do not limit performance by I/O operations, such as parsing XML, JSON files, compression sampling of Bitmap images, etc., it has a fixed thread pool, the size of the NUMBER of CPU cores. Cannot be used for I/O operations because the wait time for I/O operations wastes CPU |
Schedulers.trampoline() | The current thread executes the task immediately. If the current thread has a task executing, it will suspend it. After the inserted task is finished, the unfinished task will continue to execute |
Schedulers.single() | Have a thread singleton where all tasks are executed in this thread. When a task is executed in this thread, other tasks are executed in first-in, first-out order |
Scheduler.from(Executor executor) | Specifies a thread scheduler that controls the execution strategy of a task |
AndroidSchedulers.mainThread() | Perform tasks in the Android UI thread for Android development customization |
subscribeOn(Schedulers.io())
According to the above analysis, the subscribeOn() method will be executed to the subscribeActual method at last, the SubscribeTask method is analyzed and inherits the Runnable interface, and the RUN method will execute the source.subscribe(parent) method at last.
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
Copy the code
Here we focus on the scheduler.scheduleDirect() method.
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
returnscheduleDirect(run, 0L, TimeUnit.NANOSECONDS); } @NonNull public Disposable scheduleDirect(@NonNull Runnable run, long delay, @nonnull TimeUnit unit) {// Create a Worker object final Worker w = createWorker(); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); //DisposeTasky is also a wrapper class that inherits Runnable interface DisposeTask Task = new DisposeTask(decoratedRun, w); // Here is the key implementation, which implements the worker's schedule method w.schedule(task, delay, unit);return task;
}
Copy the code
The schedule of Worker is an abstract method, and the Worker corresponding to schedulers.io () is implemented as EventLoopWorker. The schedule implementation of the EventLoopWorker is as follows:
static final class EventLoopWorker extends Scheduler.Worker {
private final CompositeDisposable tasks;
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;
final AtomicBoolean once = new AtomicBoolean();
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get();
}
@Override
public void dispose() {
if (once.compareAndSet(false.true)) {
tasks.dispose();
// releasing the pool should be the last action
pool.release(threadWorker);
}
}
@Override
public boolean isDisposed() {
return once.get();
}
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed return EmptyDisposable.INSTANCE; } return threadWorker.scheduleActual(action, delayTime, unit, tasks); }}Copy the code
This will execute to the scheduleActual method of threadWorker, moving on
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if(parent ! = null) {if(! parent.add(sr)) {returnsr; } } Future<? > f; try {if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if(parent ! = null) { parent.remove(sr); } RxJavaPlugins.onError(ex); }return sr;
}
Copy the code
This is where executor is used to finally execute the run method. And of course the question is how does IoScheduler actually reuse threads here? Let’s look at the creation of threadWorker threads in IoScheduler as follows:
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get();
}
Copy the code
The thread reuse effect is achieved by maintaining a Worker thread pool. Specifically, let’s look at the get method of CachedWorkerPool as follows:
ThreadWorker get() {
if (allWorkers.isDisposed()) {
returnSHUTDOWN_THREAD_WORKER; } // Retrieve the cache from the released work thread queuewhile(! expiringWorkerQueue.isEmpty()) { ThreadWorker threadWorker = expiringWorkerQueue.poll(); // If found, the reused thread is returnedif(threadWorker ! = null) {returnthreadWorker; W = new ThreadWorker(threadFactory); w = new ThreadWorker(threadFactory); allWorkers.add(w);return w;
}
Copy the code
observeOn(AndroidSchedulers.mainThread())
@Override protected void subscribeActual(Observer<? Super T> observer) {// No scheduling if the current thread is specifiedif (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else{// createWorker scheduler. Worker w = scheduler.createworker (); Subscribe (new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); // Instantiate ObserveOnObserver and pass the Worker source. Subscribe (new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); }}Copy the code
Here we mainly need to analyze the ObserveOnObserver object, onNext implementation is as follows:
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode ! = QueueDisposable.ASYNC) { queue.offer(t); } schedule(); } voidschedule() {
if(getAndIncrement() == 0) { worker.schedule(this); }}Copy the code
The key is to execute the schedule of worker. The main implementation of AndroidSchedulers is HandlerScheduler, and the implementation of worker in HandlerScheduler is HandlerWorker. Let’s see the implementation of schedule as follows:
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables. if (async) { message.setAsynchronous(true); } / / through the handler sends the message handler execution run interface. SendMessageDelayed (message, unit.and toMillis (delay)); // Re-check disposed state for removing in case we were racing a call to dispose(). if (disposed) { handler.removeCallbacks(scheduled); return Disposables.disposed(); } return scheduled; }Copy the code
For an example of handler, let’s look at the creation in AndroidSchedulers as follows:
private static final class MainHolder {
static final Scheduler DEFAULT
= new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
}
Copy the code
All known AndroidSchedulers. MainThread () is through the message will be the realization of the run method to the main thread which processing, to the observer the effectiveness of data processing in the main thread
How does the RXJava back pressure policy work?
backpressure
When the upstream and downstream in different threads, the transmission, by observables processing, the response data flow, if the upstream transmit data faster than downstream receive the speed of processing data, so for those who didn’t come and processing data can cause backlog, the data is not lost, will not be recycled garbage collection mechanism, but in an asynchronous cache pool, If the data in the cache pool remains unprocessed, it accumulates and eventually runs out of memory. This is the problem of backpressure in responsive programming.
Back pressure handling mechanism
Rxjava2.x uses Flowable to support the backpressure mechanism, and the BackpressureStrategy strategy is passed when the CREATE method is called.
Strategy | role |
---|---|
MISSING | Flowable created using the Create method does not specify a backpressure policy and does not cache or discard data sent via OnNext. Backpressure policies need to be specified downstream via the backpressure operators (onBackpressureBuffer()/onBackpressureDrop()/onBackpressureLatest()) |
ERROR | Under this strategy, if put into the Flowable asynchronous data in a buffer pool overrun, would be thrown MissingBackpressureException anomalies |
BUFFER | Under this policy, Flowable asynchronous buffer pool, like the observables, no fixed size, can add data to unlimited, don’t throw MissingBackpressureException exceptions, but will lead to OOM |
DROP | Under this policy, if Flowable’s asynchronous cache pool is full, data sent upstream will be discarded |
LATEST | Like the Drop policy, if the cache pool is full, it will discard the data that is about to be added to the cache pool. However, LATEST will force the last data into the cache pool regardless of the state of the cache pool |
Realize the principle of
First look at the Create implementation of Flowable
public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) {
ObjectHelper.requireNonNull(source."source is null");
ObjectHelper.requireNonNull(mode, "mode is null");
return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));
}
Copy the code
A FlowableCreate object is created and passed in the specified BackpressureStrategy policy. Then look at the subscription method for FlowableCreate
@Override public void subscribeActual(Subscriber<? super T> t) { BaseEmitter<T> emitter; // Initialize different data transmitters according to different policies switch (backpressure) {case MISSING: {
emitter = new MissingEmitter<T>(t);
break;
}
case ERROR: {
emitter = new ErrorAsyncEmitter<T>(t);
break;
}
case DROP: {
emitter = new DropAsyncEmitter<T>(t);
break;
}
case LATEST: {
emitter = new LatestAsyncEmitter<T>(t);
break;
}
default: {
emitter = new BufferAsyncEmitter<T>(t, bufferSize());
break; } } t.onSubscribe(emitter); try { source.subscribe(emitter); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); emitter.onError(ex); }}Copy the code
BaseEmitter
abstract static class BaseEmitter<T> extends AtomicLong implements FlowableEmitter<T>, Subscription { private static final long serialVersionUID = 7326289992464377023L; final Subscriber<? super T> actual; final SequentialDisposable serial; BaseEmitter(Subscriber<? super T> actual) { this.actual = actual; this.serial = new SequentialDisposable(); Override public final void Request (long n) {if(SubscriptionHelper.validate(n)) { BackpressureHelper.add(this, n); onRequested(); }} // omit several other methodsCopy the code
Flowable has a buffer pool. What is the size of the buffer pool and where is it copied to the emitter?
Static final int BUFFER_SIZE; static { BUFFER_SIZE = Math.max(1, Integer.getInteger("rx2.buffer-size", 128));
}
public static int bufferSize() {
returnBUFFER_SIZE; } public final Flowable<T> observeOn(Scheduler) {return observeOn(scheduler, false, bufferSize());
}
Copy the code
MissingEmitter
Data transmitted via OnNext will not be cached or discarded
@Override
public void onNext(T t) {
if (isCancelled()) {
return;
}
if(t ! = null) { actual.onNext(t); }else {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
for (;;) {
long r = get();
if (r == 0L || compareAndSet(r, r - 1)) {
return; }}}Copy the code
NoOverflowBaseAsyncEmitter
DropAsyncEmitter and ErrorAsyncEmitter inherited NoOverflowBaseAsyncEmitter
@Override
public final void onNext(T t) {
if (isCancelled()) {
return;
}
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return; } // Get () is Flowable BUFFER_SIZE 128if(get() ! = 0) { actual.onNext(t); BackpressureHelper.produced(this, 1); }else {
//超出阈值 执行onOverflow
onOverflow();
}
}
Copy the code
DropAsyncEmitter
If Flowable’s asynchronous cache pool is full, data sent upstream will be discarded
static final class DropAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {
private static final long serialVersionUID = 8360058422307496563L;
DropAsyncEmitter(Subscriber<? super T> actual) {
super(actual);
}
@Override
void onOverflow() {
// nothing to do}}Copy the code
ErrorAsyncEmitter
If Flowable’s asynchronous cache pool is full, an exception is thrown
static final class ErrorAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {
private static final long serialVersionUID = 338953216916120960L;
ErrorAsyncEmitter(Subscriber<? super T> actual) {
super(actual);
}
@Override
void onOverflow() {
onError(new MissingBackpressureException("create: could not emit value due to lack of requests")); }}Copy the code
BufferAsyncEmitter
Flowable’s asynchronous buffer pool, like an Observable’s, has no fixed size and can add data to it indefinitely
@Override
public void onNext(T t) {
if (done || isCancelled()) {
return;
}
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return; } // Join queue SpscLinkedArrayQueue queue. Offer (t); Drain (); }Copy the code
LatestAsyncEmitter
Flowable’s asynchronous buffer pool, like an Observable’s, has no fixed size and can add data to it indefinitely
@Override
public void onNext(T t) {
if (done || isCancelled()) {
return;
}
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return; } // Override queue to AtomicReference queue.set(t); Drain (); }Copy the code
conclusion
thinking
This paper mainly analyzes the chain execution process, thread scheduling and back pressure mechanism of RXJava. The rXJava library also has a lot of operators and functions, I hope there is time to continue to analyze. Rxjava source code and some concept naming is still relatively complex, before and after about 2 weeks of time to learn the source code, stick to it, or harvest full.
The resources
This is probably the best RxJava 2.x tutorial (final version)
ReactiveX Chinese document
Rxjava2 Introductory Tutorial 5: Flowable Back pressure support – the most comprehensive and detailed explanation of Flowable
RxJava2 source code parsing — Thread scheduling Scheduler
recommended
Android source code series – decrypt OkHttp
Android source code series – Decrypt Retrofit
Android source code series – Decrypt Glide
Android source code series – Decrypt EventBus
Android source code series – decrypt RxJava
Android source code series – Decrypt LeakCanary
Android source code series – decrypt BlockCanary
about
Welcome to pay attention to my personal public number
Wechat search: Yizhaofusheng, or search the official ID: Life2Code
- Author: Huang Junbin
- Blog: junbin. Tech
- GitHub: junbin1011
- Zhihu: @ JunBin