Operator Fusion

Translated from Operator Fusion in RxJava 2

introduce

RxJava is a very powerful library, although it does have some problems. In particular, performance and memory issues. To minimize overhead in RxJava, there are a number of optimizations called “operator fusion.” Let’s start by reviewing how RxJava works and what problems it has.

Observable

  • Build observables
  • The Observer subscribes to observables
  • Observable passes the Disposable created to the Observer using the observer. onSubscribe method.
  • Observable can then call Observer.onNext to pass the value

An Observable does not support back pressure because the Observer cannot inform an Observable of its processing capabilities.

Flowable

Like Observable, but instead of Observer and Disposable, Subscriber and Subscription. Subscription has an additional Request (n) method that Subscriber can use to explicitly tell Flowable how many requests to issue an item. If the value is not requested, Flowable will not emit anything, which is why Flowable supports back pressure.

Assembly and subscribe

When using RxJava, there are two important stages of assembly and subscription:

  • The assembly phase establishes the Rx chain
  • The subscription phase starts the Rx chain (triggers a “storm” within various operators)

Refer to the following code

 Observable.fromArray(1.2.3)
	.map { it + 1 }
	.filter { it < 3 }
	.subscribe { println(it) }
Copy the code

The process is as follows: 1. Assemble, 2. Subscribe, 3. So much has happened with just three Rx chains. Instead of Flowable, request(n) complicates the process.

Queues and synchronization

The internal implementation of the operator may have an internal Queue for handling events. Queues should be accessed serially (which means proper synchronization). RxJava2 has atomics-based non-blocking synchronization (such as AtomicInteger) and infinite loops with compareAndSet methods. Given that each operator has its own internal Queue, queues and Atomic objects in the operator also incur additional overhead. Something like the following code,

public final class QueueDrainHelper {
    public static <T> boolean postCompleteRequest(...). {
        for(; ;) {long r = state.get();
            long r0 = r & REQUESTED_MASK;
            long u = (r & COMPLETED_MASK) | BackpressureHelper.addCap(r0, n);
            if (state.compareAndSet(r, u)) {
                if (r == COMPLETED_MASK) {
                    postCompleteDrain(n | COMPLETED_MASK, actual, queue, state, isCancelled);
                    return true;
                }
                return false; }}}}Copy the code
public final class ObservableObserveOn<T>{
     void drainNormal(a) {
            int missed = 1;
            SimpleQueue<T> q = this.queue;
            Observer a = this.actual;
            do {
                while(true) {
                    Object v;
                    try {
                        v = q.poll();
                    } catch (Throwable var7) {
                       ...
                    }

                    boolean empty = v == null;                  
                    if (empty) {
                        missed = this.addAndGet(-missed);
                        break; } a.onNext(v); }}while(missed ! =0); }}Copy the code

Issues

To sum up, the problems with RxJava are:

  • The assembly overhead, creating the Rx chain, is creating a lot of objects, which is a memory overhead
  • Subscription overhead, a lot of communication takes place, which incurs performance overhead
  • Allocation and serialization overhead – Create internal structures (such as queues and atomic objects) for each operator, incurring memory and performance overhead

Operator fusion

To solve some performance and memory problems, this is called operator fusion. Operator fusion is of two types:

  • Macro Fusion, the merge operator, minimizes the number of objects created during the assembly and subscription phases.
  • Micro fusion, removing unnecessary synchronization and sharing internal structures (such as queues) between operators

Macro fusion on Assembly

When the assembly

Macro fusion at assembly time focuses on minimizing the number of Observables and objects created during assembly. When we talk about “assembly,” we mean this place.

public abstract class Observable<T> {
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> fromCallable(Callable<? extends T> supplier) {
        ObjectHelper.requireNonNull(supplier, "supplier is null");
        return RxJavaPlugins.onAssembly(newObservableFromCallable<T>(supplier)); }}Copy the code

Assembly fusion basis

The easiest way to optimize some Observables is to add checks for special cases to create simpler Observables. For example, observable. fromArray can be degraded to Observable.empty or Observable.just.

public static <T> Observable<T> fromArray(T... items) {
        ObjectHelper.requireNonNull(items, "items is null");
        if (items.length == 0) {
            return empty();
        } else if (items.length == 1) {
            return just(items[0]);
        }
        return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
    }
Copy the code

ScalarCallable

The first “advanced” optimization in the Fuseable package is the ScalarCallable interface

public interface ScalarCallable<T> extends Callable<T> {
    @Override
    T call(a);
}
Copy the code

It inherits the generic Java Callable and has the same interface, except that exceptions cannot be thrown. ScalarCallable is a tag interface that a class that implements means it can safely extract a constant (or null) value during assembly. Based on the above description, only the empty and just related data source operator (observables/Flowable/Maybe) can be scalarCallable tag. For example, in the xMap operators (flatMap, switchMap, concatMap), if the source is marked, the verbose full implementation (ObservableFlatMap and ObservableScalarXMap are compared) can be replaced with a simplified version of xMap.

public abstract class Observable<T> {
    @SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize) {
        if (this instanceof ScalarCallable) {
            T v = ((ScalarCallable<T>)this).call();
            if (v == null) {
                return empty();
            }
            return ObservableScalarXMap.scalarXMap(v, mapper);
        }
        return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize)); }}Copy the code

FuseToXXX

These three interfaces are available in the Fuseable package

public interface FuseToObservable<T> {
    // Returns a (direct) Observable for the operator.
    Observable<T> fuseToObservable(a);
}

public interface FuseToMaybe<T> {
    Maybe<T> fuseToMaybe(a);
}

public interface FuseToFlowable<T> {
    Flowable<T> fuseToFlowable(a);
}
Copy the code

Looking further at FuseToObservable, the other two interfaces are similar. Consider the following Rx chain:

========================================
Observable.range(1.10)
	.count()
	.toObservable()
	.subscribe { println(it) }
========================================

class ObservableCountSingle<T> extends Single<Long> 
							implements FuseToObservable<Long> {
    @Override
    public Observable<Long> fuseToObservable(a) {
        return RxJavaPlugins.onAssembly(newObservableCount<T>(source)); }}abstract class Single {
 	public final Observable<T> toObservable(a) {
        if (this instanceof FuseToObservable) {
            return ((FuseToObservable<T>)this).fuseToObservable();
        }
        return RxJavaPlugins.onAssembly(new SingleToObservable<T>(this)); }}Copy the code

The assembly structure is different with or without FuseToObservable.





Macro fusion on subscribe

Macro fusion during subscription is the same as during assembly, except that it occurs in the subscribeActual method. Sometimes data is not known prior to subscription, and optimization is more convenient at subscription time.

public abstract class Observable<T> implements ObservableSource<T> {
    
    @Override
    public final void subscribe(Observer<? super T> observer) {
         subscribeActual(observer);
    }
    
    protected abstract void subscribeActual(Observer<? super T> observer);
}

Copy the code

Basic on subscribe fusion

Similarly, during assembly, we add a special case check to subscribe downgrade. For example, the following ObservableAmb code

public final class ObservableAmb<T> extends Observable<T> {
    final ObservableSource<? extends T>[] sources;
    final Iterable<? extends ObservableSource<? extends T>> sourcesIterable;

  
    public void subscribeActual(Observer<? super T> s) {
        ObservableSource<? extends T>[] sources = this.sources; .if (count == 0) {
            EmptyDisposable.complete(s);
            return;
        } else if (count == 1) {
            sources[0].subscribe(s);
            return;
        }
        AmbCoordinator<T> ac = new AmbCoordinator<T>(s, count);
        ac.subscribe(sources);
    }
Copy the code

Callable

Similar to ScalarCallable during assembly. Similar optimizations are made through Callable during the subscription period.

Note: Because ScalarCallable inherits Callable, optimizations on ScalarCallable during assembly can also be applied to Callable during subscription.

For example, the XMap operator, which subscribes to Observables that inherit from the Callable interface, might be replaced with a simplified implementation. Reference ObservableFlatMap MergeObserver, this class is too complicated, I don’t want to see.

Observable.fromCallable { 3 }
	.flatMap { Observable.fromArray(it + 1, it + 2) }
    .subscribe { println(it) }
=============================
public final class ObservableFlatMap<T.U>  {
    @Override
    public void subscribeActual(Observer<? super U> t) {
        if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
            return;
        }
        source.subscribe(newMergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize)); }}public final class ObservableScalarXMap {
     public static <T, R> boolean tryScalarXMapSubscribe(ObservableSource<T> source,
            Observer<? super R> observer,
            Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
        if (source instanceof Callable) {
            T t;
            try {
                t = ((Callable<T>)source).call();
            } catch (Throwable ex) {
                ...
                return true;
            }
            if (t == null) {
                EmptyDisposable.complete(observer);
                return true;
            }
            ObservableSource<? extends R> r;
            try {
                r = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
            } catch (Throwable ex) {
                ...
                return true; }... r.subscribe(observer);return true;
        }
        return false; }}Copy the code

Micro fusion

Microfusion aims to reduce overhead by reducing synchronization or sharing internal structures.

ConditionalSubscriber

Consider the FlowableFilter operator

public final class FlowableFilter<T> extends AbstractFlowableWithUpstream<T.T> {
    static final class FilterSubscriber<T> extends BasicFuseableSubscriber<T.T>
    implements ConditionalSubscriber<T> {
        final Predicate<? super T> filter;
        
        @Override
        public void onNext(T t) {
            if(! tryOnNext(t)) { s.request(1); }}@Override
        public boolean tryOnNext(T t) {
            boolean  b = filter.test(t);
            if (b) { 
                actual.onNext(t); 
            }
            returnb; }}}Copy the code



Imagine if there are two filters and filter.test(value) is executed twice. N,request(1) or Y, onNext(value) will be executed twice.

Enter the FilterConditionalSubscriber. It aggregates consecutive fliter.test(value)

static final class FilterConditionalSubscriber<T> 
					extends BasicFuseableConditionalSubscriber<T.T> {
   
    protected final ConditionalSubscriber<? super R> actual;   
    
    final Predicate<? super T> filter;
        @Override
        public void onNext(T t) {
            if(! tryOnNext(t)) { s.request(1); }}@Override
        public boolean tryOnNext(T t) {
            returnfilter.test(t) && actual.tryOnNext(t); }}Copy the code



Similar code can be seen in FlowableRange, FlowableMap.

Queue fuseable

The most complex microfusion is based on sharing internal queues between operators. The entire optimization is based on the QueueFuseable interface.

public interface QueueFuseable<T> extends SimpleQueue<T> {
    int NONE = 0;
	int SYNC = 1;
	int ASYNC = 2;
	int ANY = SYNC | ASYNC;
	int BOUNDARY = 4;
    //Request a fusion mode from the upstream
	int requestFusion(int mode);
}
public interface QueueSubscription<T> extends QueueFuseable<T>, Subscription {}Copy the code

Let’s take the Flowable bucket as an example, which also applies to Observables. QueueSubscription interface inherits QueueFuseable and Subscription and allows Flowable subtype operators to negotiate fusion modes. RequestFusion, which usually occurs during Subscription, when subscriber. OnSubsribe (Subscription) is called upstream. Downstream to the Subscription request (n) before calling Subscription. RequestFusion (int mode). In contrast to the onXXX callback method of regular subscriber, upstream provides not only subscription but also QueueSubscription, allowing downstream direct access to upstream internal queues. In fusion mode, the downstream obtains upstream values by calling upstream QueueSubscription. Poll (). � There are three fusion modes:

  • NONE – Fusion is not allowed
  • SYNC – Supports synchronization mode fusion
  • ASYNC – Supports asynchronous fusion



SYNC fusion

Upstream values are either already available or can be generated synchronously in poll(). If upstream and downstream agree to apply SYNC fusion, it means:

  • The downstream calls the poll() method directly when needed
  • Poll () throws an exception, equivalent to onError
  • Poll () can return null, equivalent to onComplete
  • Poll can return a non-null value, equivalent to onNext
  • No onXXX callbacks are invoked upstream

Refer to Flowable. Range and observeOn code

Flowable.range(1.9)
	.observeOn(Schedulers.newThread())
	.subscribe { println(it) }
Copy the code

RangeSubscription supports only SYNC integration

public final class FlowableRange extends Flowable<Integer> {
    abstract static class BaseRangeSubscription extends BasicQueueSubscription<Integer> {
        
        @Override
        public final int requestFusion(int mode) {
            return mode & SYNC;
        }

        @Nullable
        @Override
        public final Integer poll(a) {
            int i = index;
            if (i == end) {
                return null;
            }
            index = i + 1;
            returni; }}}Copy the code

We then look at FlowableObserveOn. FlowableSubscriber. OnSubscribe () method,

  • S is for upstream RangeSubscription.
  • Actual means downstream yourSubscriber
public final class FlowableObserveOn<T> extends AbstractFlowableWithUpstream<T.T> {
    static final class ObserveOnSubscriber<T> extends BaseObserveOnSubscriber<T>
    implements FlowableSubscriber<T> {

        @Override
        public void onSubscribe(Subscription s) {
            if (SubscriptionHelper.validate(this.s, s)) {
                // S is scription
                this.s = s;

                if (s instanceof QueueSubscription) {
                    QueueSubscription<T> f = (QueueSubscription<T>) s;
                    int m = f.requestFusion(ANY | BOUNDARY);
                    if (m == SYNC) {
                        sourceMode = SYNC;
                        queue = f;
                        done = true;
                        //actual for downstream yourSubscriber,
                        Request (long.max_value) request(long.max_value)
                        actual.onSubscribe(this);
                        return; }}}}@Override
        public final void run(a) {
           	if(sourceMode == SYNC) { runSync(); }}@Override
        void runSync(a) {...final Subscriber<? super T> a = actual;
            / / RangeSubScription upstream
            final SimpleQueue<T> q = queue;
            for (;;) {
                while(e ! = r) { T v;try {
                        v = q.poll();
                    } catch (Throwable ex) {
                        // Exceptions need to be handled
                        a.onError(ex);
                        return;
                    }
                    // A null value is required
                    if (v == null) {
                        a.onComplete();
                        return; } a.onNext(v); . }... }}}}Copy the code

Fuse SYNC compared to no FUSE:

  • Less observerOn request (x)
  • ObserverOn Does not need to maintain internal queue SpscArrayQueue and adopts upstream QueueSubScription
  • ObserverOn and Range have less onNext, which means less worker.schedule thread scheduling.

ObserveOn internally determines whether upstream subscription is QueueSubscription. FlowableRange is RangeSubscription and is QueueSubscription and supports only SYNC integration mode

ASYNC fusion

Compared to SYNC fusion, poll() does not immediately synchronize the upstream values. If the upstream and downstream agree to use ASYNC fusion, it means:

  • Upstream will still call onError onNext, onComplete
  • When onNext is actually null, poll() can be called downstream to get the actual value
  • I’m still calling request(x) downstream

ObserveOnSubscriber inherits QueueSubscription and supports only asynchronous fusion.

public final class FlowableObserveOn<T> extends AbstractFlowableWithUpstream<T.T> {
    static final class ObserveOnSubscriber<T> extends BaseObserveOnSubscriber<T>
    implements FlowableSubscriber<T> {

        @Override
        public final void run(a) {
           	if(outputFused) { runBackfused(); }}@Override
        void runBackfused(a) {
            for (;;) {
                boolean d = done;
                actual.onNext(null); . }}@Override
        public final int requestFusion(int requestedMode) {
            if((requestedMode & ASYNC) ! =0) {
                outputFused = true;
                return ASYNC;
            }
            return NONE;
        }
        
        @Nullable
        @Override
        public T poll(a) throws Exception {
            T v = queue.poll();
            if(v ! =null&& sourceMode ! = SYNC) {long p = produced + 1;
                if (p == limit) {
                    produced = 0;
                    s.request(p);
                } else{ produced = p; }}returnv; }}}Copy the code

We custom a FlowableSubscriber

var qs: QueueSubscription<*>? = null
Flowable.rangeLong(1.5)
	.observeOn(Schedulers.newThread())
	.subscribe(object : FlowableSubscriber<Long> {
		override fun onSubscribe(s: Subscription) {
			if (s is QueueSubscription<*>) {
				s.requestFusion(QueueFuseable.ASYNC)
                qs = s
             }
             s.request(Long.MAX_VALUE)
        }

        //onNext = null, poll()
        override fun onNext(t: Long?) {
        	println("onNext:$t ")
            while (true) {
            	 varvalue = qs? .poll()if (value == null) {break }
                 println("onNext poll:$value")}}override fun onError(t: Throwable?) { println("onError:$t")}override fun onComplete(a) {println("onComplete")}})// The output is:
onNext:null 
onNext poll:1
onNext poll:2
onNext poll:3
onNext poll:4
onNext poll:5
onComplete

/ / the "Flowable. RangeLong (1, 5)."
/ / change "Flowable. IntervalRange (1,5,0,100, TimeUnit. MILLISECONDS)"
// The output is:
onNext:null 
onNext:null 
onNext poll:1
onNext:null 
onNext poll:2
onNext:null 
onNext poll:3
onNext:null 
onNext poll:4
onNext:null 
onNext poll:5
onComplete
Copy the code

FlowableRangeLong supports synchronous fusion while FlowableIntervalRange � does not.

Fused threads

Poll () may cause upstream numerical calculations to be switched to a new thread. When the requestFusion parameter contains BOUNDARY, it tells the upstream that poll() will be switched. If upstream does not want the computation to be switched, the downstream fusion request is not passed.

static final class MapSubscriber<T.U> extends BasicFuseableSubscriber<T.U> {
	@Override
	public int requestFusion(int mode) {
		return transitiveBoundaryFusion(mode);
	}
    
    protected final int transitiveBoundaryFusion(int mode) {
        QueueSubscription<T> qs = this.qs;
        if(qs ! =null) {
            if ((mode & BOUNDARY) == 0) {
                int m = qs.requestFusion(mode);
                if(m ! = NONE) { sourceMode = m; }returnm; }}returnNONE; }}Copy the code
var qs: QueueSubscription<*>? = null
var executor = Executors.newSingleThreadExecutor()
Flowable.rangeLong(1.5)
	.map { println("int map: value $it,${Thread.currentThread()}"); it } .subscribe(object : FlowableSubscriber<Long> {override fun onSubscribe(s: Subscription) {
				if (s is QueueSubscription<*>) {
					var mode = s.requestFusion(QueueFuseable.SYNC)
                    println("onSubscribe fusion mode result: $mode")
                    qs = s
                 }
                 s.request(Long.MAX_VALUE)
        }

        override fun onNext(t: Long?) { executor.submit { qs? .poll() } }override fun onError(t: Throwable?) {}override fun onComplete(a) {}})// Output result:
// You can see that the map calculation is performed on the new thread instead of the original thread
onSubscribe fusion mode result: 1
int map: value 1,Thread[pool-1-thread-1.5,main]
int map: value 2,Thread[pool-1-thread-1.5,main]
int map: value 3,Thread[pool-1-thread-1.5,main]
int map: value 4,Thread[pool-1-thread-1.5,main]
int map: value 5,Thread[pool-1-thread-1.5,main]
Copy the code

Comment out executor.submit {qs? .poll()}, the requestFusion parameter adds QueueFuseable.BOUNDARY

var qs: QueueSubscription<*>? = null
var executor = Executors.newSingleThreadExecutor()
Flowable.rangeLong(1.5)
	.map { println("int map: value $it,${Thread.currentThread()}"); it } .subscribe(object : FlowableSubscriber<Long> {override fun onSubscribe(s: Subscription) {
			if (s is QueueSubscription<*>) {
				var mode = s.requestFusion(QueueFuseable.SYNC or QueueFuseable.BOUNDARY)
                println("onSubscribe fusion mode result: $mode")
                qs = s
            }
            s.request(Long.MAX_VALUE)
        }

        override fun onNext(t: Long?) {}
		override fun onError(t: Throwable?) {}override fun onComplete(a) {}})// Output result:
// You can see that the map calculation is performed on the new thread instead of the original thread
onSubscribe fusion mode result: 0
int map: value 1,Thread[main,5,main]
int map: value 2,Thread[main,5,main]
int map: value 3,Thread[main,5,main]
int map: value 4,Thread[main,5,main]
int map: value 5,Thread[main,5,main]
Copy the code

conclusion

Operator fusion is cool, but Ming is not applied to all operators. Some operator optimizations are more difficult than they look. You can write long Rx chains, but don’t assume that RxJava will do everything efficiently. Optimize what you can.

reference

Operator Fusion in RxJava2 RxJava2 Author written: operator-Fusion (Part 1) RxJava2 Author written: Operator Fusion (Part 2-final)