What is RXJava?

ReactiveX

ReactiveX is an acronym for Reactive Extensions, commonly known as Rx. Originally developed as an extension to LINQ and opened in November 2012 by a team led by Microsoft architect Erik Meijer, ReactiveX is a programming model that aims to provide a consistent programming interface. To help developers more easily handle asynchronous data flow, Rx library support. NET, JavaScript, and C++, Rx has become increasingly popular in recent years and now supports almost all popular programming languages. Most of Rx’s libraries are maintained by the ReactiveX organization, with the most popular being rxjava/RXJ/rx.net.

rxjava

Rxjava is an implementation of ReactiveX on the Java platform. Is a programming model that provides chained interface calls in observer mode, dynamically controlling thread switching, making it easy to handle asynchronous data flows.

Introduction to the

Github:rxjava

Chinese version: ReactiveX/RxJava

Liverpoolfc.tv: reactivex

The characteristics of

  • Chain call, easy to use
  • To simplify the logic
  • Flexible thread scheduling
  • Provide perfect data operator, powerful function

Observer model

The observer pattern defines a one-to-many dependency between objects so that whenever an object changes state, all dependent objects are notified and automatically updated. At the heart of rXJava’s design is the observer pattern. An Observable is the observed, and an Observer is the Observer. It subscribes via the subscribe method.

  • advantages

The observer and the observed are abstractly decoupled in response to business changes

Enhance system flexibility and scalability

Refer to the Design pattern-Observer pattern for code examples

  • disadvantages

Need to consider the development efficiency when using the observer pattern and efficiency problem, the program includes a observed, more than one observer, development and debugging, etc will be more complex, and the notice of default is in Java message order, an observer caton, will affect the overall execution efficiency, and in this case, generally consider using an asynchronous manner

How to use RXJava?

Gradle introduced version

   implementation 'the IO. Reactivex. Rxjava2: rxjava: 2.2.0'
   implementation 'the IO. Reactivex. Rxjava2: rxandroid: 2.1.0'
Copy the code

To take a common example of rXJava usage, we often need to request a server-side interface in a project, then fetch the data, cache the data, and then handle the display on the UI. The code for this example is as follows:

Observable.create(new ObservableOnSubscribe<Response>() { @Override public void subscribe(ObservableEmitter<Response> e) Throws Exception {// Obtain the interface data of the server. Request.builder Builder = new request.builder ().url("http://xxx.com") .get(); Request request = builder.build(); Call call = new OkHttpClient().newCall(request); Response response = call.execute(); e.onNext(response); } }).map(new Function<Response, Model>() {@override public Model apply(Response Response) throws Exception {// Convert JSON data to the corresponding Modelif (response.isSuccessful()) {
                    ResponseBody body = response.body();
                    if(body ! = null) { Log.e(TAG,"Map: before conversion :" + response.body());
                        returnnew Gson().fromJson(body.string(), Model.class); }}returnnull; }}). DoOnNext (new Consumer<Model>() {@override public void Accept (Model s) throws Exception Log.e(TAG,"DoOnNext: Save network loaded data:" + s.toString() + "\n"); } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Model>() { @override public void accept(Model Model) throws Exception {// Refresh UI log. e(TAG,"Successfully refresh interface :" + data.toString() + "\n"); }}, New Consumer<Throwable>() {@override public void accept(Throwable Throwable) throws Exception {// Error message log.e (TAG,"Failed to handle exception:" + throwable.getMessage() + "\n"); }});Copy the code

This article mainly analyzes the source code of RXJava. For the use of rXJava operators, it is recommended to refer to the Chinese documentation, as well as the following blog introduction.

This is probably the best RxJava 2.x tutorial (final version)

What is the rXJava core execution flow?

Rxjava is designed in Observer mode. New Observables and Observers are generated when operators are executed. Observables hold upstream objects, and observers hold downstream observers. When the subscribe method is executed, the subscribe method is executed upstream by holding the upstream observed object. When executing ObservableEmitter’s onNext method to the original ObservableEmitter callback method, since the Observer holds the downstream Observer object, the ObservableEmitter onNext method will be progressively called until the incoming Observer instance is subscribed. This is the core execution flow of RXJava chain calls.

Of course, RXJava also involves scheduling of threads and back-pressure processing of data. The implementation principles of these will be discussed later. But the core execution flow of rXJava’s chain calls is the same. We’ll take a look at the core execution flow of RXJava in two parts, including a description of some key classes, and through sample code-related execution flow charts.

Description of key class functions

class instructions
ObservableSource Interface class with a single SUBSCRIBE method that takes an Observer object
Observer Interface class, observer. There are onSubscribe, onNext, onError, onComplete methods
Consumer The interface has the observer. There is only one Accept method, which will eventually be converted to an Observer when subscribed. This class is designed to simplify calls
Observable The abstract class inherits from the ObservableSource interface, from which all implementations of operators inherit. Internally encapsulates a large number of operator call methods, mainly there is a core abstract method abstract void subscribeActual(Observer<? Super T> observer) for implementing the associated subscription distribution logic.
AbstractObservableWithUpstream Inherits from Observable, the constructor needs to pass in the ObservableSource source object, which is the parent observed.
ObservableCreate Inheritance in AbstractObservableWithUpstream, source for ObservableOnSubscribe. The subscribeActual method instantiates a CreateEmitter object that executes the Subscribe method of ObservableOnSubscribe
ObservableMap Inheritance in AbstractObservableWithUpstream, MapObserver subscription will make a new observer
MapObserver An internal class of ObservableMap, the onNext method triggers the mapper.apply(t) callback and then executes the downstream observer’s onNext method
ObservableDoOnEach Inheritance in AbstractObservableWithUpstream, DoOnEachObserver subscription will make a new observer
DoOnEachObserver ObservableDoOnEach’s inner class, onNext will execute the onNext.Accept (t) method and then the onNext method of the downstream observer
ObservableSubscribeOn The inheritance in AbstractObservableWithUpstream, thread scheduling control observed. ScheduleDirect (new SubscribeTask(parent)) is executed by subscribeActual, and source.subscribe(parent) is executed by SubscribeTask’s Run method. The ObservableSubscribeOn implements the upstream subscription method implementation according to the thread scheduler’s policy.
ObservableObserveOn Thread scheduling control inheritance in AbstractObservableWithUpstream, observer. The subscribeActual method determines whether the Scheduler is trampoline escheduler. If the downstream observer is executed, a new ObserveOnObserver is created and passed to schedule’s work.
ObserveOnObserver The ObservableObserveOn inner class, onNext, triggers the execution of the Schedule () method to control the callback thread of the downstream observer based on the worker

Code execution flow

First of all, according to the above demo example, we comb out the simple implementation process of RXJava, as shown below:

According to the flowchart, RXJava generates new Observables and Observers when executing related operators. Observables hold upstream objects, and observers hold downstream observers. When the subscribe method is executed, the subscribe method is executed upstream by holding the upstream observed object. When executing ObservableEmitter’s onNext method to the original ObservableEmitter callback method, since the Observer holds the downstream Observer object, the ObservableEmitter onNext method will be progressively called until the incoming Observer instance is subscribed.

With an understanding of rXJava’s general execution process, let’s take a detailed look at the source code execution process. First or first on the overall flow chart, because the picture is larger, it is recommended to view the above demo and RXJava source code.

Let’s look at the implementation of rXJava source code in detail by assigning a few operators.

create

The create operator returns the observed of an ObservableCreate.

  public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source."source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
Copy the code

The key implementation code for the ObservableCreate object is as follows:

// The constructor passes in a reference to the ObservableOnSubscribe interface, specified as the observedsource. public ObservableCreate(ObservableOnSubscribe<T>source) {
        this.source = source; // Override protected void subscribeActual(Observer<? CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); // Execute the subscribe callback of ObservableOnSubscribe, passing the CreateEmitter object source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); }}Copy the code

ObservableEmitter onNext: ObservableEmitter onNext: ObservableEmitter onNext: ObservableEmitter onNext

CreateEmitter(Observer<? super T> observer) { this.observer = observer; } @Override public void onNext(T t) {if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return; } // If the subscription is not unsubscribed, the downstream observer's onNext method is executed to achieve the effect of a chain callif (!isDisposed()) {
                observer.onNext(t);
            }
        }
Copy the code

map

The map operator returns the observed of an ObservableMap.

  @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }
Copy the code

Let’s look at the key implementation code for the ObservableMap object, as follows:

@Override public void subscribeActual(Observer<? Source. Subscribe (new MapObserver< t, U>(t,function));
    }
Copy the code

ObservableEmitter’s onNext method is called when ObservableEmitter’s onNext method is fired.

Mapper MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) { super(actual); this.mapper = mapper; } @Override public void onNext(T t) {if (done) {
                return;
            }

            if (sourceMode ! = NONE) { actual.onNext(null);return; } U v; Try {/ / the map at the core of the executing code, mapper. Apply (t) will perform data transformation, and the results after conversion v continue execution to the downstream of the observer v = ObjectHelper... requireNonNull (mapper. Apply (t),"The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return; } // Execute actual. OnNext (v); }Copy the code

doOnNext

The doOnNext operator returns the observed of an ObservableDoOnEach.

    private Observable<T> doOnEach(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Action onAfterTerminate) {
        ObjectHelper.requireNonNull(onNext, "onNext is null");
        ObjectHelper.requireNonNull(onError, "onError is null");
        ObjectHelper.requireNonNull(onComplete, "onComplete is null");
        ObjectHelper.requireNonNull(onAfterTerminate, "onAfterTerminate is null");
        return RxJavaPlugins.onAssembly(new ObservableDoOnEach<T>(this, onNext, onError, onComplete, onAfterTerminate));
    }
Copy the code

Let’s look at the key implementation code for the ObservableDoOnEach object, as follows:

@Override public void subscribeActual(Observer<? Super T> T) {// instantiate a DoOnEachObserver observer object source.subscribe(new DoOnEachObserver<T>(T, onNext, onError, onComplete, onAfterTerminate)); }Copy the code

Here, we still need to see the data processing of onNext of DoOnEachObserver, as follows:

  @Override
        public void onNext(T t) {
            if (done) {
                return; } try {// Call onNext. Accept (t); } catch (Throwable e) { Exceptions.throwIfFatal(e); s.dispose(); onError(e);return; } // Continue downstream to call the observer's onNext actual. OnNext (t); }Copy the code

subscribeOn

The subscribeOn operator returns the observed of an ObservableSubscribeOn and passes in the Scheduler thread scheduling parameter.

@CheckReturnValue @SchedulerSupport(SchedulerSupport.CUSTOM) public final Observable<T> subscribeOn(Scheduler scheduler)  { ObjectHelper.requireNonNull(scheduler,"scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
Copy the code

The key implementation code for the ObservableSubscribeOn object is as follows:

     public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source); this.scheduler = scheduler; } @Override public void subscribeActual(final Observer<? Super T> s) {// Final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); s.onSubscribe(parent); // This is the core method, which calls the scheduleDirect method, And pass in the SubscribeTask task parent-setDisposable (scheduler.scheduleDirect(new SubscribeTask(parent))); }Copy the code

Let’s look at the SubscribeTask implementation as follows:

  final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {// Execute the upstream observed subscription method, which is the core source. Subscribe (parent); }}Copy the code

Finally, look at the onNext method of SubscribeOnObserver, which is relatively simple and executes the onNext method of downstream observer directly as follows:

   @Override
        public void onNext(T t) {
            actual.onNext(t);
        }
Copy the code

The specific implementation of scheduler will be analyzed in the following thread principle. All we need to know is that the upstream observed subscription is executed in the specified Scheduler thread policy.

observerOn

The observerOn operator returns the observed of an ObservableObserveOn and passes in the Scheduler thread scheduling parameter.

     @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }
Copy the code

Let’s look at the key implementation code for the ObservableObserveOn object, as follows:

@Override protected void subscribeActual(Observer<? Super T> observer) {//TrampolineScheduler will subscribe the downstream observer directly to the upstream observer if it is the current threadif (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } elseWorker. Worker w = scheduler.createworker (); Source. subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); }}Copy the code

The next key is to look at the ObserveOnObserver implementation as follows:

@override public void onNext(T T) {// If the mode is not asynchronous, join the queueif (sourceMode ! = QueueDisposable.ASYNC) { queue.offer(t); } // schedule(); } voidschedule() {// Determine the number of tasks currently being executedif(getAndIncrement() == 0) { worker.schedule(this); }}Copy the code

ObserveOnObserver itself inherits the Runnable interface. The run method is implemented as follows:

@Override
public void run() {// Whether the output results are fusedif (outputFused) {
        drainFused();
    } else{ drainNormal(); }}Copy the code

Let’s start with the drainNormal method:

void drainNormal() { int missed = 1; final SimpleQueue<T> q = queue; final Observer<? super T> a = actual; // Loop the first layerfor(;;) {// Check exception handlingif (checkTerminated(done, q.isEmpty(), a)) {
            return; } // The second loopfor (;;) {
            boolean d = done; T v; V = q.pll (); boolean empty = v == null; // Check the exceptionif (checkTerminated(d, empty, a)) {
                return; } // If there is no more data, exitif (empty) {
                break; } // Perform the next operation. a.onNext(v); }}}}}}}}}}}}}}}}}if (missed == 0) {
            break; }}}Copy the code

The specific implementation of scheduler will be analyzed in the following thread principle. All we need to know is that the downstream observer’s onNext is executed in the specified Scheduler thread policy.

subscribe

 @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; }}Copy the code

The last subscription method, after a non-null check, calls the subscribeActual method and starts executing the subscription layer by layer upstream.

How does an Observable send data?

Through the above process analysis, we can know. If an Observable is created using create, ObservableOnSubscribe sends data from ObservableEmitter’s onNext in its Subscribe method, and onNext triggers the ObservableEmitter to start sending data to downstream observers. Of course, there are many other rXJava creation operators, such as just, from, etc., essentially trigger the downstream observer onNext to send data.

How does the Observer receive data?

Through source code analysis, each chain layer Observer holds adjacent downstream observers, and when data is sent, the Observer onNext method is chain-executed, and finally the Observer created in the SUBSCRIBE method is executed.

How is the subscription implemented between the observed and the observer?

Every Observable at the chain layer will hold adjacent upstream Observable objects. After the subscribe method is called, it will finally be executed to the subscribeActual method, in which the observer and the upstream observed are subscribed.

How does RXJava schedule threads?

There are many implementations of Scheduler in RXJava. Then through the most commonly used. SubscribeOn (Schedulers. IO ()). ObserveOn (AndroidSchedulers. MainThread ()) to analyze the specific thread scheduling process.

Scheduler

When we call subscribeOn and observeOn, we will pass in the Scheduler object. First, we will take a look at the types and functions of Scheduler

The Scheduler types instructions
Schedulers. IO ( ) SD Calvin for IO intensive operation, such as, speaking, reading and writing, query the database, access to the network, etc., with a thread cache mechanism, after this scheduler receives the task, first check the thread cache pool, if there is a free thread, if you have, the reuse, without creating new threads, and add to the thread pool, if there is no idle thread used every time, New threads can be created without an upper limit
Schedulers. NewThread ( ) NewThread ( ) can be used wherever schedulers.io ( ) is used. However, the schedulers.newthread ( ) can be used. Schedulers.newthread ( ) is not as efficient as schedulers.io ( )
Schedulers.computation() For CPU intensive computing tasks, that is, time-consuming operations that do not limit performance by I/O operations, such as parsing XML, JSON files, compression sampling of Bitmap images, etc., it has a fixed thread pool, the size of the NUMBER of CPU cores. Cannot be used for I/O operations because the wait time for I/O operations wastes CPU
Schedulers.trampoline() The current thread executes the task immediately. If the current thread has a task executing, it will suspend it. After the inserted task is finished, the unfinished task will continue to execute
Schedulers.single() Have a thread singleton where all tasks are executed in this thread. When a task is executed in this thread, other tasks are executed in first-in, first-out order
Scheduler.from(Executor executor) Specifies a thread scheduler that controls the execution strategy of a task
AndroidSchedulers.mainThread() Perform tasks in the Android UI thread for Android development customization

subscribeOn(Schedulers.io())

According to the above analysis, the subscribeOn() method will be executed to the subscribeActual method at last, the SubscribeTask method is analyzed and inherits the Runnable interface, and the RUN method will execute the source.subscribe(parent) method at last.

@Override
   public void subscribeActual(final Observer<? super T> s) {
       final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

       s.onSubscribe(parent);
       
       parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
   }
Copy the code

Here we focus on the scheduler.scheduleDirect() method.

 @NonNull
  public Disposable scheduleDirect(@NonNull Runnable run) {
      returnscheduleDirect(run, 0L, TimeUnit.NANOSECONDS); } @NonNull public Disposable scheduleDirect(@NonNull Runnable run, long delay, @nonnull TimeUnit unit) {// Create a Worker object final Worker w = createWorker(); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); //DisposeTasky is also a wrapper class that inherits Runnable interface DisposeTask Task = new DisposeTask(decoratedRun, w); // Here is the key implementation, which implements the worker's schedule method w.schedule(task, delay, unit);return task;
  }
Copy the code

The schedule of Worker is an abstract method, and the Worker corresponding to schedulers.io () is implemented as EventLoopWorker. The schedule implementation of the EventLoopWorker is as follows:

     static final class EventLoopWorker extends Scheduler.Worker {
      private final CompositeDisposable tasks;
      private final CachedWorkerPool pool;
      private final ThreadWorker threadWorker;

      final AtomicBoolean once = new AtomicBoolean();

      EventLoopWorker(CachedWorkerPool pool) {
          this.pool = pool;
          this.tasks = new CompositeDisposable();
          this.threadWorker = pool.get();
      }

      @Override
      public void dispose() {
          if (once.compareAndSet(false.true)) {
              tasks.dispose();

              // releasing the pool should be the last action
              pool.release(threadWorker);
          }
      }

      @Override
      public boolean isDisposed() {
          return once.get();
      }

      @NonNull
      @Override
      public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
          if (tasks.isDisposed()) {
              // don't schedule, we are unsubscribed return EmptyDisposable.INSTANCE; } return threadWorker.scheduleActual(action, delayTime, unit, tasks); }}Copy the code

This will execute to the scheduleActual method of threadWorker, moving on

 public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
      Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

      ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

      if(parent ! = null) {if(! parent.add(sr)) {returnsr; } } Future<? > f; try {if (delayTime <= 0) {
              f = executor.submit((Callable<Object>)sr);
          } else {
              f = executor.schedule((Callable<Object>)sr, delayTime, unit);
          }
          sr.setFuture(f);
      } catch (RejectedExecutionException ex) {
          if(parent ! = null) { parent.remove(sr); } RxJavaPlugins.onError(ex); }return sr;
  }
Copy the code

This is where executor is used to finally execute the run method. And of course the question is how does IoScheduler actually reuse threads here? Let’s look at the creation of threadWorker threads in IoScheduler as follows:

  EventLoopWorker(CachedWorkerPool pool) {
          this.pool = pool;
          this.tasks = new CompositeDisposable();
          this.threadWorker = pool.get();
      }
Copy the code

The thread reuse effect is achieved by maintaining a Worker thread pool. Specifically, let’s look at the get method of CachedWorkerPool as follows:

    ThreadWorker get() {
          if (allWorkers.isDisposed()) {
              returnSHUTDOWN_THREAD_WORKER; } // Retrieve the cache from the released work thread queuewhile(! expiringWorkerQueue.isEmpty()) { ThreadWorker threadWorker = expiringWorkerQueue.poll(); // If found, the reused thread is returnedif(threadWorker ! = null) {returnthreadWorker; W = new ThreadWorker(threadFactory); w = new ThreadWorker(threadFactory); allWorkers.add(w);return w;
      }
Copy the code

observeOn(AndroidSchedulers.mainThread())

@Override protected void subscribeActual(Observer<? Super T> observer) {// No scheduling if the current thread is specifiedif (scheduler instanceof TrampolineScheduler) {
           source.subscribe(observer);
       } else{// createWorker scheduler. Worker w = scheduler.createworker (); Subscribe (new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); // Instantiate ObserveOnObserver and pass the Worker source. Subscribe (new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); }}Copy the code

Here we mainly need to analyze the ObserveOnObserver object, onNext implementation is as follows:

 @Override
       public void onNext(T t) {
           if (done) {
               return;
           }

           if (sourceMode ! = QueueDisposable.ASYNC) { queue.offer(t); } schedule(); } voidschedule() {
           if(getAndIncrement() == 0) { worker.schedule(this); }}Copy the code

The key is to execute the schedule of worker. The main implementation of AndroidSchedulers is HandlerScheduler, and the implementation of worker in HandlerScheduler is HandlerWorker. Let’s see the implementation of schedule as follows:

public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
           if (run == null) throw new NullPointerException("run == null");
           if (unit == null) throw new NullPointerException("unit == null");

           if (disposed) {
               return Disposables.disposed();
           }

           run = RxJavaPlugins.onSchedule(run);

           ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

           Message message = Message.obtain(handler, scheduled);
           message.obj = this; // Used as token for batch disposal of this worker's runnables. if (async) { message.setAsynchronous(true); } / / through the handler sends the message handler execution run interface. SendMessageDelayed (message, unit.and toMillis (delay)); // Re-check disposed state for removing in case we were racing a call to dispose(). if (disposed) { handler.removeCallbacks(scheduled); return Disposables.disposed(); } return scheduled; }Copy the code

For an example of handler, let’s look at the creation in AndroidSchedulers as follows:

private static final class MainHolder {
       static final Scheduler DEFAULT
           = new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
   }
Copy the code

All known AndroidSchedulers. MainThread () is through the message will be the realization of the run method to the main thread which processing, to the observer the effectiveness of data processing in the main thread

How does the RXJava back pressure policy work?

backpressure

When the upstream and downstream in different threads, the transmission, by observables processing, the response data flow, if the upstream transmit data faster than downstream receive the speed of processing data, so for those who didn’t come and processing data can cause backlog, the data is not lost, will not be recycled garbage collection mechanism, but in an asynchronous cache pool, If the data in the cache pool remains unprocessed, it accumulates and eventually runs out of memory. This is the problem of backpressure in responsive programming.

Back pressure handling mechanism

Rxjava2.x uses Flowable to support the backpressure mechanism, and the BackpressureStrategy strategy is passed when the CREATE method is called.

Strategy role
MISSING Flowable created using the Create method does not specify a backpressure policy and does not cache or discard data sent via OnNext. Backpressure policies need to be specified downstream via the backpressure operators (onBackpressureBuffer()/onBackpressureDrop()/onBackpressureLatest())
ERROR Under this strategy, if put into the Flowable asynchronous data in a buffer pool overrun, would be thrown MissingBackpressureException anomalies
BUFFER Under this policy, Flowable asynchronous buffer pool, like the observables, no fixed size, can add data to unlimited, don’t throw MissingBackpressureException exceptions, but will lead to OOM
DROP Under this policy, if Flowable’s asynchronous cache pool is full, data sent upstream will be discarded
LATEST Like the Drop policy, if the cache pool is full, it will discard the data that is about to be added to the cache pool. However, LATEST will force the last data into the cache pool regardless of the state of the cache pool

Realize the principle of

First look at the Create implementation of Flowable

 public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) {
        ObjectHelper.requireNonNull(source."source is null");
        ObjectHelper.requireNonNull(mode, "mode is null");
        return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));
    }
Copy the code

A FlowableCreate object is created and passed in the specified BackpressureStrategy policy. Then look at the subscription method for FlowableCreate

@Override public void subscribeActual(Subscriber<? super T> t) { BaseEmitter<T> emitter; // Initialize different data transmitters according to different policies switch (backpressure) {case MISSING: {
            emitter = new MissingEmitter<T>(t);
            break;
        }
        case ERROR: {
            emitter = new ErrorAsyncEmitter<T>(t);
            break;
        }
        case DROP: {
            emitter = new DropAsyncEmitter<T>(t);
            break;
        }
        case LATEST: {
            emitter = new LatestAsyncEmitter<T>(t);
            break;
        }
        default: {
            emitter = new BufferAsyncEmitter<T>(t, bufferSize());
            break; } } t.onSubscribe(emitter); try { source.subscribe(emitter); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); emitter.onError(ex); }}Copy the code

BaseEmitter

abstract static class BaseEmitter<T> extends AtomicLong implements FlowableEmitter<T>, Subscription { private static final long serialVersionUID = 7326289992464377023L; final Subscriber<? super T> actual; final SequentialDisposable serial; BaseEmitter(Subscriber<? super T> actual) { this.actual = actual; this.serial = new SequentialDisposable(); Override public final void Request (long n) {if(SubscriptionHelper.validate(n)) { BackpressureHelper.add(this, n); onRequested(); }} // omit several other methodsCopy the code

Flowable has a buffer pool. What is the size of the buffer pool and where is it copied to the emitter?

Static final int BUFFER_SIZE; static { BUFFER_SIZE = Math.max(1, Integer.getInteger("rx2.buffer-size", 128));
    }
    
   public static int bufferSize() {
        returnBUFFER_SIZE; } public final Flowable<T> observeOn(Scheduler) {return observeOn(scheduler, false, bufferSize());
    }
Copy the code

MissingEmitter

Data transmitted via OnNext will not be cached or discarded

   @Override
        public void onNext(T t) {
            if (isCancelled()) {
                return;
            }

            if(t ! = null) { actual.onNext(t); }else {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }

            for (;;) {
                long r = get();
                if (r == 0L || compareAndSet(r, r - 1)) {
                    return; }}}Copy the code

NoOverflowBaseAsyncEmitter

DropAsyncEmitter and ErrorAsyncEmitter inherited NoOverflowBaseAsyncEmitter

  @Override
        public final void onNext(T t) {
            if (isCancelled()) {
                return;
            }

            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return; } // Get () is Flowable BUFFER_SIZE 128if(get() ! = 0) { actual.onNext(t); BackpressureHelper.produced(this, 1); }else {
                //超出阈值 执行onOverflow
                onOverflow();
            }
        }
Copy the code

DropAsyncEmitter

If Flowable’s asynchronous cache pool is full, data sent upstream will be discarded

  static final class DropAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {


        private static final long serialVersionUID = 8360058422307496563L;

        DropAsyncEmitter(Subscriber<? super T> actual) {
            super(actual);
        }

        @Override
        void onOverflow() {
            // nothing to do}}Copy the code

ErrorAsyncEmitter

If Flowable’s asynchronous cache pool is full, an exception is thrown

    static final class ErrorAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {


        private static final long serialVersionUID = 338953216916120960L;

        ErrorAsyncEmitter(Subscriber<? super T> actual) {
            super(actual);
        }

        @Override
        void onOverflow() {
            onError(new MissingBackpressureException("create: could not emit value due to lack of requests")); }}Copy the code

BufferAsyncEmitter

Flowable’s asynchronous buffer pool, like an Observable’s, has no fixed size and can add data to it indefinitely

      @Override
        public void onNext(T t) {
            if (done || isCancelled()) {
                return;
            }

            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return; } // Join queue SpscLinkedArrayQueue queue. Offer (t); Drain (); }Copy the code

LatestAsyncEmitter

Flowable’s asynchronous buffer pool, like an Observable’s, has no fixed size and can add data to it indefinitely

      @Override
        public void onNext(T t) {
           if (done || isCancelled()) {
                return;
            }

            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return; } // Override queue to AtomicReference queue.set(t); Drain (); }Copy the code

conclusion

thinking

This paper mainly analyzes the chain execution process, thread scheduling and back pressure mechanism of RXJava. The rXJava library also has a lot of operators and functions, I hope there is time to continue to analyze. Rxjava source code and some concept naming is still relatively complex, before and after about 2 weeks of time to learn the source code, stick to it, or harvest full.

The resources

This is probably the best RxJava 2.x tutorial (final version)

ReactiveX Chinese document

Rxjava2 Introductory Tutorial 5: Flowable Back pressure support – the most comprehensive and detailed explanation of Flowable

RxJava2 source code parsing — Thread scheduling Scheduler

recommended

Android source code series – decrypt OkHttp

Android source code series – Decrypt Retrofit

Android source code series – Decrypt Glide

Android source code series – Decrypt EventBus

Android source code series – decrypt RxJava

Android source code series – Decrypt LeakCanary

Android source code series – decrypt BlockCanary

about

Welcome to pay attention to my personal public number

Wechat search: Yizhaofusheng, or search the official ID: Life2Code

  • Author: Huang Junbin
  • Blog: junbin. Tech
  • GitHub: junbin1011
  • Zhihu: @ JunBin