preface

On March 14, RxJava opened source its third version.

In this version, the following changes have been made:

RxJava 3 components are located under the IO. Reactivex. Rxjava3 package (RxJava 1 has Rx and RxJava 2 is just io.reactivex. This allows version 3 to live side by side with the earlier versions. In addition, the core types of RxJava (Flowable, Observer, etc.) have been moved to io.reactivex.rxjava3.core. For dyslexic friends to give me a cet-4 translation, with the following changes:

  1. File migration. RxJava3 components are migrated to packagesio.reactivex.rxjava3In the
  2. Forward compatibility.

(2) Behavioral changes. Correct some existing errors, etc.

(3) API changes. Use of @functionalInterface annotations, etc

Details are in the documentation: What’s Different in 3.0

Overall our basic development features haven’t changed much.

What is RxJava?

RxJava — Reactive Extensions for the JVM — a library for composing asynchronous and event-based programs using observable sequences for the Java VM.

RxJava is a library that uses observable sequences on the Java VM to compose asynchronous, event-based programs.

Method of use

A chain call based on an event stream completes the subscription

Observable.create<String> {
            it.onNext("The items: 1")
            it.onNext("The items: 2")
            it.onError(Exception())
            it.onComplete()
        }.subscribe(object : Observer<String> {
            override fun onSubscribe(d: Disposable) {
                Log.d(TAG, "The subscribe event");
            }
            override fun onNext(s: String) {
                Log.d(TAG, "Next event: $s");
            }
            override fun onError(e: Throwable) {
                Log.d(TAG, "Error event");
            }
            override fun onComplete(a) {
                Log.d(TAG, "Complete event"); }})Copy the code

We can see a few classes and methods that stand out.

  1. Observable: The observed
  2. An Observer of the world.
  3. Subscribe: to Subscribe to
  4. The Disposable:Break even. Add to the function where the class appearsd.dispose()This one piece of code will disconnect the connection.

Is there a question as to why the observed subscribes to the observed?

To better understand, we map Observable, Observer and Subscribe to our daily life, which are customers, cooks and waiters. The customer tells the waiter what to eat, and the waiter tells the chef what to do.

And then there’s another question, what if our chefs are overwhelmed? This is also a problem that is very easy to meet in daily life. There are too many customers, and there are only a few chefs, which leads to the chef being busy and confused. RxJava also has such a problem, the processing speed is certain, but the amount of data being observed is too large, how should we go to process it? This brings us to the concept of back pressure.

RxJava back pressure

We have seen the reason why RxJava has back pressure in the previous article, so let’s illustrate it here.


If this happens for a long time, messages will stack and the app will crash in OOM.

Before looking at the source solution, let’s take a look at the following figure:

Note: Does not correspond directly to the actual code


Corresponding to the basic use of source code

Flowable.create<Int>({ emitter ->
            // A total of 129 events were sent, i.e., the size of the cache was exceeded
            // Change the value 128 to 0 to observe the change
            for (i in 0.128) {
                Log.d(TAG, "Sent event $I")
                emitter.onNext(i)
            }
            emitter.onComplete()
        }, BackpressureStrategy.ERROR) // Back pressure policy added
        .subscribe(object : Subscriber<Int> {
                override fun onSubscribe(s: Subscription) {
                    Log.d(TAG, "onSubscribe")}
                override fun onNext(integer: Int) {
                    Log.d(TAG, "Event $INTEGER received")}
                override fun onError(t: Throwable) {
                    Log.w(TAG, "onError: ", t)
                }

                override fun onComplete(a) {
                    Log.d(TAG, "onComplete")}})Copy the code

From the source, you can see some uses like this:

  1. Flowable: also known as the back pressure implementation class
  2. The Subscriber:To subscribe to, andObserverAlmost, but with a little more adaptationFlowableThe function of the
  3. BackpressureStrategy: Focus on presentation.

BackpressureStrategy/ BackpressureStrategy

This is what we have been thinking about above, now let’s take a look at what RxJava offers us.

public enum BackpressureStrategy {
    /** * the buffer is full */
    MISSING,
    /** * The default mode, when the data exceeds the cache 128, throws an exception */
    ERROR,
    /** * unlimited cache, may be OOM */
    BUFFER,
    /** * Discard incoming data */
    DROP,
    /** * discard the original data */ if it exceeds 128
    LATEST
}
Copy the code

The four strategies correspond to the results

  1. MISSING

2. ERROR

3. BUFFER:128 events were successfully sent

4. DROP:Only 127 data can be retrieved 5. The LATEST:Get the last data sent, which is 149


The thread of control

Having said that, do you have any ideas for another very important point, which is what threads do?

In Android development we have always had a strong notion that time-consuming tasks should not be run on the UI thread. What about RxJava? Back in our code above, let’s do an experiment to observe.

Observable.create<String> {
            Log.e(TAG, Observable worker thread: + Thread.currentThread().name)
        }.subscribe(object : Observer<String> {
            override fun onSubscribe(d: Disposable) {
                Log.d(TAG, "The subscribe event");
                Log.e(TAG, Observer worker thread: + Thread.currentThread().name)
            }

            override fun onNext(s: String) {
                Log.d(TAG, "Next event: $s");
            }

            override fun onError(e: Throwable) {
                Log.d(TAG, "Error event");
            }

            override fun onComplete(a) {
                Log.d(TAG, "Complete event"); }})Copy the code

It is obvious from the figure that the current worker thread ismain, which is the main thread.

???? That’s not too bad. When we finish our time-consuming tasks in the main thread, we’ll have ANR problems. Naturally, a solution was needed.

So let’s look at the first one, the self-cover-up. Add an additional layer of Thread to the above code.

The picture shows that the worker thread has switched, but how to update the UI data becomes a problem again, of course we can still add our ownHandlerTo solve the problem.

To solve this problem, RxJava gives us a nice solution to subscribeOn() & observeOn(), along with some of the scene content that is already defined.

type meaning Application scenarios
Schedulers.immediate() Current thread = no thread specified The default
AndroidSchedulers.mainThread() Android is the main thread Operating the UI
Schedulers.newThread() Regular new thread Time-consuming operations
Schedulers.io() IO operation thread IO intensive operations such as network requests and reading and writing files
Schedulers.computation() CPU computation operation thread Large number of computation operations

Method of use

Observable.create<String> {
            Log.e(TAG, Observable worker thread: + Thread.currentThread().name)
        }
            .subscribeOn(Schedulers.newThread())
            .observeOn(Schedulers.io())
            .subscribe(object : Observer<String> {
            override fun onSubscribe(d: Disposable) {
                Log.d(TAG, "The subscribe event");
                Log.e(TAG, Observer worker thread: + Thread.currentThread().name)
            }

            override fun onNext(s: String) {
                Log.d(TAG, "Next event: $s");
            }

            override fun onError(e: Throwable) {
                Log.d(TAG, "Error event");
            }

            override fun onComplete(a) {
                Log.d(TAG, "Complete event"); }})Copy the code

The use of operators

Of course, I’m not going to do so many demos here, so I suggest looking at them directlyCarson_HoBig guy’s article, below is each corresponding link:

  • Create operator: https://www.jianshu.com/p/e19f8ed863b1
  • Conversion operators: https://www.jianshu.com/p/904c14d253ba
  • Portfolio/merge operators: https://www.jianshu.com/p/c2a7c03da16d
  • Function operator: https://www.jianshu.com/p/b0c3669affdb
  • Filter operator: https://www.jianshu.com/p/c3a930a03855
  • Condition/Boolean operators: https://www.jianshu.com/p/954426f90325

Introduction to the source code

Next we take a simple source code above the use of the process for analysis.

Observable.create<String> {
            it.onNext("The items: 1")
            it.onNext("The items: 2")
            it.onError(Exception())
            it.onComplete()
        }.subscribe(object : Observer<String> {
            override fun onSubscribe(d: Disposable) {
                Log.d(TAG, "The subscribe event");
            }
            override fun onNext(s: String) {
                Log.d(TAG, "Next event: $s");
            }
            override fun onError(e: Throwable) {
                Log.d(TAG, "Error event");
            }
            override fun onComplete(a) {
                Log.d(TAG, "Complete event"); }})Copy the code

So now we have to do an analysis of the whole structure:

  1. Observable.create: How is the object created?
  2. Observer: What does an Observer function call look like
  3. Subsrcibe: HowObserverandObservableWhat if it’s between different threads?

Observable.create

The e create function exists as a general reference. It can also be just or fromArray. They all end up with the same function.

RxJavaPlugins.onAssembly(...) ;// There are several such classes
// 1. ObservableFromArray
// 2. ObservableJust
// 3. ObservableFromIterable
/ / 4....
// They all inherit Observable, they have the same override method subscribeActual(Observer)
Copy the code

We will focus on the create function and the whole process to do a detailed explanation.

subscribe

Robust code was removed to make the code texture clearer.

public final void subscribe(@NonNull Observer<? super T> observer) {
        try {
            // Make the link
            observer = RxJavaPlugins.onSubscribe(this, observer);
            Observable and Observer are linked together
            subscribeActual(observer); / / 1 - >
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            RxJavaPlugins.onError(e);
            npe.initCause(e);
            thrownpe; }}Copy the code

So we’re going to look at what the subscribeActual() function does.

@Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<>(observer);
        observer.onSubscribe(parent); / / 2 - >

        try {
            source.subscribe(parent); / / 3 - >
        } catch(Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); }}Copy the code

Notice the creation of CreateEmitter, the link to onSubsrcibe(), and source.subscribe(parent); Subscription of data.

OnSubscribe () indicates that our function has completed the subscription.

CreateEmitter

static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }
        // Next message sent
        @Override
        public void onNext(T t) {
            / /...
        }
        // Error sending
        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            / /...
        }
        // Complete the connection
        @Override
        public void onComplete(a) {
            / /...
        }
        // Disconnect the connection
        @Override
        public void dispose(a) {
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed(a) {
            returnDisposableHelper.isDisposed(get()); }}Copy the code

Here we clearly see the logic of the whole data processing, so our Observer can be understood as a class for our custom processing.

Ask a question, why is it that our data is not sent or received after an error?

Please take a look at the onError source.

        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            if (t == null) {
                t = ExceptionHelper.createNullPointerException("onError called with a null Throwable.");
            }
            if(! isDisposed()) {try {
                    observer.onError(t);
                } finally {
                    // Disconnect after an error occurs in the message
                    dispose();
                }
                return true;
            }
            return false;
        }
Copy the code

After completing an error operation, our connection was closed, so our subsequent data could not be received.

source.subscribe(parent);

In the above article, because of the direct use of Kotlin’s lambda expression, so it is not intuitive, here I switch to Java to write once.

Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Object> emitter) throws Throwable {
}});Copy the code

This is an Observable creation process, and obviously the function we’re looking at is the one we’re looking for to override. The definition of onNext(), onCompelete() and other functions used internally is provided by ObservableEmitter.

Asynchronous communication


RxJava asynchronous communication principle

Thread switching principle of observeOn()

In the case of observeOn(), we can see from the source that it uses such a class ObservableObserveOn, and the value we pass in is the Schedulers I mentioned earlier.

    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler; // The Scheduler we pass in
        this.delayError = delayError; // Delay onError output
        this.bufferSize = bufferSize; // Buffer size
    }
Copy the code

Let’s call this class’s subscribeActual() function.

@Override
protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(newObserveOnObserver<>(observer, w, delayError, bufferSize)); }}Copy the code

Clearly Scheduler is used, so we can assume from the previous article that the responder to this data should be the ObserveOnObserver class.

Let’s go to the next layer and look at its composition. We can see the following code (onComplete(), onSubscribe()..) All can.

@Override
public void onNext(T t) {
            if (done) {
                return;
            }

            if(sourceMode ! = QueueDisposable.ASYNC) { queue.offer(t);/ / 1 - >
            }
            schedule(); / / 2 - >
        }
Copy the code

His onNext() function has an asynchronous judgment, and the data is fetched from a queue. This queue is put aside for now, and we assume it is related to our previous cache.

Take a look at the schedule() function in note 2.

        void schedule(a) {
            if (getAndIncrement() == 0) {
                worker.schedule(this); }}Copy the code

It calls the worker, which is the function of the Scheduler we pass in. We choose newThread() for a view, and this is the ObserveOnObserver itself.

// Worker.createWorker()
public Worker createWorker(a) {
        return new NewThreadWorker(threadFactory); / / 1 - >
    }
/ / 1 - >
public NewThreadWorker(ThreadFactory threadFactory) {
        // create a thread pool
        executor = SchedulerPoolFactory.create(threadFactory);
    }
// worker.schedule(this);
public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
        if (disposed) {
            return EmptyDisposable.INSTANCE;
        }
        return scheduleActual(action, delayTime, unit, null); / / 2 - >
    }
/ / 2 - >
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
        / /...
Future<? > f;try {
            // Use a thread pool to keep data running efficiently
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f); // Complete the thread switch by returning a new thread and setting it
        } catch (RejectedExecutionException ex) {
            if(parent ! =null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }

        return sr;
    }
Copy the code

After dropping our ObserveOnObserver into the thread pool, we have completed the thread switch.

Thread switching principle of subscribeOn()

Throw out a question, why does the Internet say itsubscribeOn()It only works once, right?

Again, we slowly illustrate the problem in source code, with the subscribeActual() function of the ObservableSubscribeOn class below.

@Override
    public void subscribeActual(final Observer<? super T> observer) {
        // Create the SubscribeOnObserver that is bound to it
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);

        observer.onSubscribe(parent);
        // SubscribeTask is a Runnable
        // What does scheduler do then
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); / / 1 - >
    }

public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        // Just bind Runnable to Worker
        DisposeTask task = new DisposeTask(decoratedRun, w); 
        // The schedule function looks familiar
        // We mentioned this in observeOn above
        // The location of thread switches that are maintained using the thread pool
        w.schedule(task, delay, unit); / / 2 - >

        return task;
    }
Copy the code

After we get our Disposable, we’re in parent-setDisposable ().

void setDisposable(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }
Copy the code

SetOnce () can only be setOnce, so why my subscribeOn() is only set the first time.

Charge! Going back to our thread content, since it is a thread pool, it is natural to look at the corresponding thread, by looking at our DisposeTask’s run() function.

@Override
public void run(a) {
            runner = Thread.currentThread(); 
            try {
                decoratedRun.run(); // 1 --> SubscribeTask
            } finally {
                dispose();
                runner = null; // The current thread is released when it finishes running}}
// corresponds to the run() function in our SubscribeTask
public void run(a) {
            // This source is our Observable
            source.subscribe(parent); 
        }
Copy the code

The streaming event causes the upper Observable to subscribe to our data SubscribeOnObserver again, and the thread switches again.

How does RxAndroid communicate asynchronously?

For one of our RxAndroid applications, who is in the IO thread and who is in the UI thread?

All right, so if you ask it directly, you might lose the thread, so let’s change the question, who is the data producer and who is the data consumer? Corresponding to our network request process, it is obvious that network request is a task that works in the child thread, while data update is in the main thread. So in our RxJava, it’s obvious that the Observable is the producer and the Observer is the consumer, so we know who should be on the IO thread. It’s an Observable and the Observer should be on the UI thread. But that’s where the problem is, how do we communicate data? My observed has data, but how is our observer supposed to know?

To see how this works, we should subscribe in the IO thread and observe in the UI thread.

.subscribeOn(Schedulers.io()) // Corresponds to the observed
.observeOn(AndroidSchedulers.mainThread()) // Corresponds to the observer
Copy the code

In the previous article we mentioned a concept called a cache, which can be found in the FlowableCreate source code.

// around line 64 of the source code
emitter = new BufferAsyncEmitter<>(t, bufferSize());
// The bufferSize() function corresponds to 128
// So there will be cases where we get an error when the cache exceeds 128
Copy the code

But here we don’t see anything about sending data, just the presence of a buffer. So let’s go ahead and analyze it. We have previously analyzed the Observer source code, which makes some receiving procedures, and Subscribe is similar, so the scheme is also not in this class.

So that’s where we’re going, what our original code is doing.

AndroidSchedulers.mainThread()

For Emitter, it already holds subscribed objects and can send data directly, which is similar to the observer mode. However, the data that we can find in Flowable is actually actively pulled through FlowableSubscriber, which is different from the active push in observer mode.

But the data communication is need to have a look at our AndroidSchedulers mainThread (). Since we’re going to be updating data in the UI thread, we’re not going to use the above method, so how does RxJava do this?

You can see it in the observeOn source code

public void subscribeActual(Subscriber<? super T> s) {
        Worker worker = scheduler.createWorker();

        if (s instanceof ConditionalSubscriber) {
            source.subscribe(new ObserveOnConditionalSubscriber<>(
                    (ConditionalSubscriber<? super T>) s, worker, delayError, prefetch));
        } else {
            source.subscribe(newObserveOnSubscriber<>(s, worker, delayError, prefetch)); }}Copy the code

We can see such a piece of code scheduler. CreateWorker (), we take AndroidSchedulers mainThread () take a look.

public final class AndroidSchedulers {

    private static final class MainHolder {
        static final Scheduler DEFAULT
            = new HandlerScheduler(new Handler(Looper.getMainLooper()), true); }}
HandlerScheduler(Handler handler, boolean async) {
        this.handler = handler;
        this.async = async;
    }
Copy the code

The construction of android-like schedulers can find that the Handler mechanism is used in the end, that is, when the main thread is finally cut, the Handler mechanism is used to send messages, and it directly obtains the main thread Looper and transmits messages directly to the main thread.

So this is the end of our RxJava process.

conclusion


The resources

  • Carson_Ho bosses RxJava series: https://www.jianshu.com/p/e1c48a00951a
  • Rounding RxJava2 thread switching principle: https://www.jianshu.com/p/a9ebf730cd08

This article is formatted using MDNICE