introduce

RxJava is a very powerful library, although it has some problems. In particular, performance and memory issues arise from the problems the problem library is trying to solve and how to design solutions from a technical perspective.

To minimize overhead in RxJava, there are a number of optimizations called “operator fusion.” We will discuss them in this article.

But first, let’s review how RxJava reactive types work and what problems they have.

Observable

When you use an Observable, there are three main parts: Observable, Observer, and Disposable.

We all know about Observables and how they are created (e.g. Observable. Just (” Hello, World! )). Observable is the building block of each RX-chain. To make an Observable work, we need to pass an Observer to subscribe(…). Method to subscribe to it.

The Observer is basically an interface with onSubscribe, onNext, onError, and onComplete callbacks.

When we subscribe to an Observable with an Observer, the Observable creates a Disposable object and passes it to the Observer via the onSubscribe callback (so that, if necessary, The Observer can handle rX-chain.

After that, an Observable can emit values via onNext, without waiting.

Therefore, an Observable does not support back pressure — the Observer has no way of telling an Observable that it should wait without emitting more values.

Flowable

In Flowable, everything is similar, but instead of Observer and Disposable, we have Subscriber and Subscription.

Subscriptions have an additional Request (n) method that Subscriber can use to explicitly request Flowable to issue the requested number of items. Flowable does not signal anything if there is no requested value, which is why Flowable supports backpressure.

Assembly time and subscription time

There are two important phases when using RxJava reactive types: assemble time and subscribe time.

The RX-chain is established at assembly time, and we “start” the RX-chain at subscription time.

Consider the following example:

In this case, we go from top to bottom, and the following happens:

  • Create an ObservableJust object
  • The ObservableMap object is created, and the previously created ObservableJust is passed to the new Observable(so they are assembled together).
  • The ObservableFilter object is created, and the previously created ObservableMap(with ObservableJust in it) is passed to the new Observable
  • We subscribe to ObservableFilter, which triggers the actual subscription
  • ObservableFilter creates its own internal observer and subscribes to ObservableMap
  • ObservableMap creates its own internal observer and subscribes to ObservableJust
  • ObservableJust sends an onSubscribe event downstream (other Observables send this event to the latest Observable in their downstream chain).
  • ObservableJust starts emitting values, and they passonNextCallbacks are passed down

So you can see a lot going on with this short RX-chain. If the chain is of type Flowable, then additional communication requests (n) will also occur, which further complicates the situation.

Queuing and synchronization

Operators may have internal Queues for processing events. Access to this queue should be serialized (meaning that it should be accessed in a properly synchronized manner).

RxJava2 has non-blocking synchronization based on Atomics(such as AtomicInteger) and infinite loops using the compareAndSet method (for better performance). So in a library, you usually see code like this:

for(; ;) {long r = state.get();

    if((r & COMPLETED_MASK) ! =0L) {
        return;
    }

    long u = r | COMPLETED_MASK;
    // (active, r) -> (complete, r) transition
    if (state.compareAndSet(r, u)) {
        // if the requested amount was non-zero, drain the queue
        if(r ! =0L) {
            postCompleteDrain(u, actual, queue, state, isCancelled);
        }

        return; }}Copy the code

If you consider that each operator in a chain can have its own Queue, the Queue and Atomic objects in the operator also incur overhead.

The problem

To sum up, RxJava has the following problems:

  • Assembly time overhead – In order to create an RX-chain, a large number of objects need to be created, which introduces memory overhead
  • Subscription-time overhead – When we subscribe to rX-chain, a lot of communication takes place, which introduces performance overhead
  • Allocation and serialization overhead — Creating internal structures as queues and atomic objects for each operator incurs memory and performance overhead

Operator fusion

To solve some performance and memory problems, there is “operator fusion”.

There are two types of fusion:

  • Macro fusion – Minimizes the number of objects created at assembly time or subscription time by conforming several operations into one operator
  • Microfusion – Removes unnecessary synchronization and sharing of internal structures (such as queues) between operators

Macro fusion during assembly

When assembling

Macro convergence at assembly time focuses on minimizing the number of Observables and objects created during assembly. When we say “when assembled,” we mean this place:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
Copy the code

Assembly fusion basis

The easiest way to optimize some Observables is to add special case checks so that you create Observables that are simpler to implement than normal. For example, we can look at observable. fromArray, which can be downgraded to Either Observable.empty or Observable.just as long as there are 0 or 1 items providing:

public static <T> Observable<T> fromArray(T... items) {
    ObjectHelper.requireNonNull(items, "items is null");
    if (items.length == 0) {
        return empty();
    }
    if (items.length == 1) {
        return just(items[0]);
    }
    return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
}
Copy the code

ScalarCallable

The first “advanced” optimization we looked at was the ScalarCallable interface in the Fuseable package:

public interface ScalarCallable<T> extends Callable<T> {

    // overridden to remove the throws Exception
    @Override
    T call(a);
}
Copy the code

It is basically the same interface as normal Java Callable, except that it does not throw an exception.

ScallarCallable is an interface that reactive types can implement and holds constants that can be safely extracted at assembly time. Specifically, this reactive type can contain only one project or none.

So when we call the call method – we check the return value: if it’s null – then the reaction type doesn’t have any value, and if it returns a non-null value – then it only has that value.

According to the contract described, only the Just and empty operators in Observable, Flowable, and Maybe can be tagged with this interface.

Then, for example in the xMap operator (flatMap, switchMap, concatMap), if the source is marked as this interface, we can apply optimizations:

public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
        boolean delayErrors, int maxConcurrency, int bufferSize) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    if (this instanceof ScalarCallable) {
        @SuppressWarnings("unchecked")
        T v = ((ScalarCallable<T>)this).call();
        if (v == null) {
            return empty();
        }
        return ObservableScalarXMap.scalarXMap(v, mapper);
    }
    return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
}
Copy the code

If the source is marked as a ScalarCallable interface, we can switch to a simplified version of xMap instead of a full implementation (which is quite onerous).

FuseToXXX

There are three interfaces in the fuseable package:

public interface FuseToObservable<T> {
    Observable<T> fuseToObservable(a);
}

public interface FuseToMaybe<T> {
    Maybe<T> fuseToMaybe(a);
}

public interface FuseToFlowable<T> {
    Flowable<T> fuseToFlowable(a);
}
Copy the code

Let’s take a closer look at FuseToObservable. For other interfaces, everything is similar.

Consider that we have the following Rx chain

public static void count(a) {
    Observable.range(1.10)
            .count()
            .toObservable()
            .subscribe();
}
Copy the code

Here, we create some scopes and want to count the number of items that are issued. The count operator returns Single, but we want to have an Observable(for example, we want to merge this result with some other Observables). We then added an extra toObservable operator on the RX-chain to make it more complex and heavy.

FuseToObservable helps here. This interface is about operators that return reactive types instead of Observables. There are implementations that return Observables, which can be extracted in toObservable calls.

Consider our example of count:

public final Single<Long> count(a) {
    return RxJavaPlugins.onAssembly(new ObservableCountSingle<T>(this));
}
Copy the code

By default, it returns ObservableCountSingle, but if we look at the implementation of this operator, we see that it implements the FuseToObservable interface and can provide different implementations when invoked in converged mode:

public final class ObservableCountSingle<T> extends Single<Long> implements FuseToObservable<Long> {...@Override
    public Observable<Long> fuseToObservable(a) {
        return RxJavaPlugins.onAssembly(newObservableCount<T>(source)); }... }Copy the code

When we call toObservable, the implementation will be extracted, which means that the toObservable Observable will not be created:

public final Observable<T> toObservable(a) {
    if (this instanceof FuseToObservable) {
        return ((FuseToObservable<T>)this).fuseToObservable();
    }
    return RxJavaPlugins.onAssembly(new SingleToObservable<T>(this));
}
Copy the code

Macro fusion at subscription time

Macro fusion at subscription time focuses on the same types of optimizations that are done at assembly time, but they are done in the subscribeActual method

@Override
public void subscribeActual(Observer<? super U> t) {
    source.subscribe(new MapObserver<T, U>(t, function));
}
Copy the code

Sometimes it is not possible to apply optimizations at assembly time because the data is not known until subscription, and sometimes it is more convenient to perform some optimizations at subscription time than at assembly time.

Merge the base when subscribing

As with assembly, there are simple optimizations that check for special conditions and use simplified implementations instead of generic ones. For example, observable. amb checks the number of supplied sources to determine whether heavy Ambcoordinators should be instantiated

public void subscribeActual(Observer<? super T> observer) {
    ObservableSource<? extends T>[] sources = this.sources;
    int count = 0; .if (count == 0) {
        EmptyDisposable.complete(observer);
        return;
    } else
    if (count == 1) {
        sources[0].subscribe(observer);
        return;
    }

    AmbCoordinator<T> ac = new AmbCoordinator<T>(observer, count);
    ac.subscribe(sources);
}
Copy the code

Callable

At assembly time, we made some optimizations to the ScalarCallable interface. We have similar optimizations for the Callable interface when subscribing.

NOTE: Because ScalarCallable inherits from Callable, any optimizations that can be applied to ScalarCallable at assembly time can also be applied to Callable at subscription time

For example, in the xMap operator that subscribes to mark Observables with a Callable interface, you can switch to a simplified implementation

@Override
public void subscribeActual(Observer<? super U> t) {

    if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
        return;
    }

    source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
Copy the code

Micro fusion

The goal of microfusion is to minimize overhead by reducing some synchronization or sharing internal structures as queues.

Conditional Subscriber

Let’s look at what happens when we use operators like flowable.filter:

We have Upstream, our filter operator and downstream. Suppose our filter function checks if value is less than 5. After the subscription is established, the downstream must request some items from the upstream:

  • The downstream requests an item from the Filter
  • Filter requests an item from upstream
  • Upstream generates item and passes it to Filter(assuming it is the number 1)
  • Filter checks that the value satisfies the predicate and passes it downstream
  • The downstream accepts the item and requests another item from the Filter
  • Filter requests an item from upstream
  • Upstream generates item and passes it to Filter(assuming it is the number 10)
  • The Filter check value does not satisfy the predicate and cannot be passed to the downstream, even though the downstream requested an item, but the Filter did not provide it, so the Filter requests another item from the upstream
  • This is repeated until the stream terminates

As you can see, there is a lot of communication between the operators and, more importantly, each event is serialized, which means there is some overhead.

Consider that we have two Filter operators between us — communication can be significantly increased, hence the overhead:

That’s where ConditionalSubscriber comes in:

public interface ConditionalSubscriber<T> extends FlowableSubscriber<T> {
    boolean tryOnNext(T t);
}
Copy the code

Normally the onNext callback in Subscriber does not return any value as the upstream simply provides the value through the callback and waits for a new request from the downstream.

ConditionalSubscriber has the attached method tryOnNext, which is similar to onNext except that it immediately returns a Boolean value to determine whether the value has been accepted.

This reduces the number of Request (n) calls required when upstream receives a direct response.

If we take flowable.filter as an example. We can see that basically the upstream filter has direct access to the predicate of the downstream filter predicate:

@Override
public boolean tryOnNext(T t) {...boolean b;
    try {
        b = filter.test(t);
    } catch (Throwable e) {
        fail(e);
        return true;
    }
    return b && downstream.tryOnNext(t);
}
Copy the code

This saves some request calls:

However, if the goal of this optimization is to reduce the overhead of the chain-filter operator (since it seems to be written in a filter operator anyway), it’s not so great. On the plus side, Flowable. Map also implements ConditionalSubscriber, which is cheaper when multiple filters and maps are joined together:

@Override
public boolean tryOnNext(T t) {
    if (done) {
        return false;
    }

    U v;

    try {
        v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
    } catch (Throwable ex) {
        fail(ex);
        return true;
    }
    return downstream.tryOnNext(v);
}
Copy the code

The Queue fusion

The most complex microfusion is based on shared internal queues between operators, and the overall optimization is based on QueueSubscription interfaces

public interface QueueSubscription<T> extends QueueFuseable<T>, Subscription {}Copy the code

It’s basically just Queue and Subscription under one interface. But the Queue interface is not just a simple Java interface; instead, it has additional methods called requestFusion:

public interface QueueFuseable<T> extends SimpleQueue<T> {
    int NONE = 0;
    int SYNC = 1;
    int ASYNC = 2;
    int ANY = SYNC | ASYNC;
    int BOUNDARY = 4;

    int requestFusion(int mode);
}
Copy the code

The idea is that, in contrast to the usual communication between Flowable and Subscriber via onXXX callbacks, downstream can provide not only Subscription but also QueueSubscription, allowing downstream direct access to internal queues.

The mechanism is as follows.

First, during the onSubscribe process, upstream and downstream should agree on the fusion and choose the fusion mode they will work on. If they agree to merge – then the new communication implementation will be used, if they don’t agree – the traditional communication via onXXX callback will be established.

Usually after the fusion is established, the downstream gets the item directly by calling the poll() method on the upstream queue:

There are three fusion modes:

  • NONE: fusion is not supported
  • SYNC: indicates the synchronization and fusion mode
  • ASYNC: asynchronous fusion mode

ANY – just SYNC or ASYNC (which is based on upstream support).

SYNC fusion

SYNC fusion is only available if the upstream value is already statically available or generated with a synchronous call to poll().

If the upstream and downstream agree to adopt synchronous fusion, the following contracts shall be signed:

  • Downstream will be called directly as neededpoll()methods
  • poll()Method can throw exceptions – this is equivalent toonError
  • poll()Method can returnnull— This is equivalent toonComplete
  • poll() method can return a non-null value — this is equivalent toonNext
  • Upstream does not call anyonXXXThe callback

Examples of synchronous fusion modes supported are flowable.range:

@Override
public final int requestFusion(int mode) {
    return mode & SYNC;
}

@Nullable
@Override
public final Integer poll(a) {
    int i = index;
    if (i == end) {
        return null;
    }
    index = i + 1;
    return i;
}
Copy the code

ASYNC fusion

ASYNC fusion mode is available when upstream values may eventually be available to poll().

If the upstream and downstream agree to use the asynchronous fusion mode, the following contract will be generated:

  • Upstream signals onError and onComplete as usual

  • OnNext may not actually contain upstream values and use NULL instead.

    Downstream should take such onNext as an indication that poll() can be called

  • The caller of poll() should catch the exception

Yes, it may have empty values in onNext in RxJava*

An example operator that supports asynchronous fusion mode is flowable.filter:

@Nullable
    @Override
    public T poll(a) throws Exception {
        QueueSubscription<T> qs = this.qs;
        Predicate<? super T> f = filter;

        for (;;) {
            T t = qs.poll();
            if (t == null) {
                return null;
            }

            if (f.test(t)) {
                return t;
            }

            if (sourceMode == ASYNC) {
                qs.request(1); }}}}Copy the code
@Override
public boolean tryOnNext(T t) {
    if (done) {
        return false;
    }

    if(sourceMode ! = NONE) {return downstream.tryOnNext(null);
    }

    boolean b;
    try {
        b = filter.test(t);
    } catch (Throwable e) {
        fail(e);
        return true;
    }
    return b && downstream.tryOnNext(t);
}
Copy the code

We’ve looked at some examples of operators that support the fusion mode, but to enable this mode, you must first request the fusion. For example, within Flowable. FlatMap for the InnerSubscriber request fusion mode:

@Override
public void onSubscribe(Subscription s) {
    if (SubscriptionHelper.setOnce(this, s)) {

        if (s instanceof QueueSubscription) {
            @SuppressWarnings("unchecked")
            QueueSubscription<U> qs = (QueueSubscription<U>) s;
            int m = qs.requestFusion(QueueSubscription.ANY | QueueSubscription.BOUNDARY);
            if (m == QueueSubscription.SYNC) {
                fusionMode = m;
                queue = qs;
                done = true;
                parent.drain();
                return;
            }
            if(m == QueueSubscription.ASYNC) { fusionMode = m; queue = qs; } } s.request(bufferSize); }}Copy the code

Here you can see that ANY is requested during subscription when the source implements QueueSubscription pattern.

Depending on the pattern accepted by the source, different policies are applied.

QueueSubscription thread

When using queue fusion, it is important to be aware of threading issues. If we allow downstream access to upstream queues, this may cause problems if upstream and downstream work on different threads:

There may be some heavy computations inside the map, which (in the case of direct polling) may leak computations to different threads. To solve this problem, there is an additional marking option BOUNDARY, which indicates that the caller of the polling method may perform the operation on a different thread. The operator then either ignores the BOUNDARY option and allows access to its queue from another thread, or rejects fusion if the BOUNDARY option is requested.

If we look at the Observable.map implementation. We can see it using the transitiveBoundaryFusion helper function:

@Override
public int requestFusion(int mode) {
    return transitiveBoundaryFusion(mode);
}
Copy the code

It states that the use of BOUNDARY mode is not allowed:

protected final int transitiveBoundaryFusion(int mode) {
    QueueDisposable<T> qd = this.qd;
    if(qd ! =null) {
        if ((mode & BOUNDARY) == 0) {
            int m = qd.requestFusion(mode);
            if(m ! = NONE) { sourceMode = m; }returnm; }}return NONE;
}
Copy the code

conclusion

In this article, we gave an overview of some optimizations in RxJava and found some interesting things:

  • Observable in RxJava 2 does not support back pressure (because it has no way to tell upstream not to provide more items)
  • Null values are not allowed in RxJava because some of the following optimizations are based on passing null values in callbacks
  • If I want to turn off all optimizations,hide()Operators are very important
  • Operator fusion is a curious thing, although it’s still just some optimizations. They are not applicable to all operators. Surprisingly, in some cases, it sounds like it can be optimized — there’s actually no tuning at all. The reason is that these optimizations are applied to some critical places and common solutions, and it is very difficult to do general optimizations.

So, instead of assuming that RxJava can do everything efficiently per se, now you can write long RX-chains. Benchmark your code, analyze the important chains, and try to figure out how to optimize them individually.

Operator Fusion in RxJava 2