The operator

All operators are operators, except Subscribe. Each operator generates a new Observable: upstream, downstream, Map, etc. Operators that transform data sent upstream, So will inherit AbstractObservableWithUpstream to hold the upstream observables, and encapsulation after their Observer to perform operations on data for downstream to the Observer

Map

Map performs a simple transformation (mapping) on the upstream data items, returning the new ObservableMap. The implementation wraps the downstream Observer as a MapObserver subscription to the upstream

Observable
    .range(1.5)
    .map(v -> v * 10)
    .subscribe(System.out::println);
Copy the code
public final class ObservableMap<T.U> extends AbstractObservableWithUpstream<T.U> {...@Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(newMapObserver<T, U>(t, function)); }... }Copy the code

FlatMap

The name implies that the Map of upstream data items becomes an Observable, and then the Observable flatten refers to the merge that does not guarantee the order

Observable
    .range(1.5)
    .flatMap(integer -> Observable.range(1.5))
    .subscribe(System.out::println);
Copy the code
public final class ObservableFlatMap<T.U> extends AbstractObservableWithUpstream<T.U> {...@Override
    public void subscribeActual(Observer<? super U> t) {
        if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
            return;
        }
        source.subscribe(newMergeObserver<>(t, mapper, delayErrors, maxConcurrency, bufferSize)); }...static final class MergeObserver<T.U> extends AtomicInteger implements Disposable.Observer<T> {
        @Override
        public void onNext(T t) {... ObservableSource<? extends U> p;try {
                p = Objects.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
            } catch(Throwable e) { ... }... subscribeInner(p); }void subscribeInner(ObservableSource<? extends U> p) {
            for (;;) {
                if (p instanceof Supplier) {
                    ...
                } else {
                    InnerObserver<T, U> inner = new InnerObserver<>(this, uniqueId++);
                    if (addInner(inner)) {
                        p.subscribe(inner);
                    }
                    break;
                }
            }
        }
    }
}
Copy the code

To put it simply, the upstream data items are transformed into Observables and all of them are subscribes to the downstream. However, special processing is done for observables that only emit one value, and concurrent operations are also processed, so it is more complicated

If you want to ensure sequential merge, you can use the concatMap operator


SubscribeOn and ObserveOn

RxJava does not use threads or thread pools directly. Instead, it uses Scheduler and SubscribeOn and ObserveOn operators to complete asynchronous operations. Schedulers.putation () for computation-intensive tasks, schedulers.io () for I/ O-intensive tasks, and schedulers.single () for serial tasks if no scheduler is specified. The logic of the operator is executed on the thread on which the observer is notified, as in:

Observable
    .<String>create(emitter -> {
        for (int i = 0; i < 3; i++) {
            String numStr = String.valueOf(i);
            System.out.println(String.format(Locale.US, "emitting \"%s\" in Thread: [%s]", numStr, Thread.currentThread().getName()));
            emitter.onNext(numStr);
        }
        emitter.onComplete();
    })
    .map(s -> {
        Integer number = Integer.decode(s);
        System.out.println(String.format(Locale.US, "map (\"%s\" -> %d) in Thread: [%s]", s, number, Thread.currentThread().getName()));
        return number;
    })
    .filter(v -> {
        System.out.println(String.format(Locale.US, "filter %d in Thread: [%s]", v, Thread.currentThread().getName()));
        return v % 2= =0;
    })
    .subscribe(r -> {
        System.out.println(String.format(Locale.US, "observe %d in Thread: [%s]", r, Thread.currentThread().getName()));
    });
Copy the code
emitting "0"   in Thread: [main]
map ("0" -> 0) in Thread: [main]
filter 0       in Thread: [main]
observe 0      in Thread: [main]
emitting "1"   in Thread: [main]
map ("1" -> 1) in Thread: [main]
filter 1       in Thread: [main]
emitting "2"   in Thread: [main]
map ("2" -> 2) in Thread: [main]
filter 2       in Thread: [main]
observe 2      in Thread: [main]
Copy the code

Call subscribe(), which calls the subscribeActual() method of the last operator, ObservableFilter, That method calls the subscribeActual() method that subscribeActual() calls the upstream ObservableMap, which calls the logic in Create (), OnNext () calls observer.onnext (t) and the observer we define is wrapped as a FilterObserver in the subscribeActual() of ObservableFilter. In ObservableMap, which is packaged as MapObserver, So the observer operating in ObservableCreate is the MapObserver and the onNext() of the MapObserver is the onNext() of the downstream FilterObserver that passes the value map, It passes onNext() to the downstream observer we defined after filter

Observable [Create, Map, Filter, Subscribe] Observer ObservableFilter subscribes our Observer in the main thread

ObservableMap SUBSCRIBE FilterObserver ObservableMap SUBSCRIBE FilterObserver ObservableMap SUBSCRIBE FilterObserver The ObservableCreate subscribe MapObserver is executed in the main thread ObservableCreate#subscribeActual() emitters. OnNext () ObservableCreate# onNext() emitters. OnNext () Execute onNext() we defined in the main thread

ObserveOn () can specify a scheduler, return a new Observable (ObservableObserveOn), and let the downstream onNext() execute on the given scheduler (thread), e.g

Observable
    .create()
    .observeOn(Schedulers.computation())
    .map()
    .filter()
    .subscribe();
Copy the code
emitting "0"   in Thread: [main]
emitting "1"   in Thread: [main]
emitting "2"   in Thread: [main]
map ("0" -> 0) in Thread: [RxComputationThreadPool-1]
filter 0       in Thread: [RxComputationThreadPool-1]
observe 0      in Thread: [RxComputationThreadPool-1]
map ("1" -> 1) in Thread: [RxComputationThreadPool-1]
filter 1       in Thread: [RxComputationThreadPool-1]
map ("2" -> 2) in Thread: [RxComputationThreadPool-1]
filter 2       in Thread: [RxComputationThreadPool-1]
observe 2      in Thread: [RxComputationThreadPool-1]
Copy the code

That is, observeOn(Schedulers.computation()) lets the downstream MapObserver#onNext() execute on the calculation thread. Of course observeOn() can be called multiple times to change which thread downstream onNext() is executed on, as in

Observable
    .create()
    .observeOn(Schedulers.computation())
    .map()
    .observeOn(AndroidSchedulers.mainThread())
    .filter()
    .subscribe();
Copy the code
emitting "0"   in Thread: [main]
emitting "1"   in Thread: [main]
emitting "2"   in Thread: [main]
map ("0" -> 0) in Thread: [RxComputationThreadPool-1]
map ("1" -> 1) in Thread: [RxComputationThreadPool-1]
map ("2" -> 2) in Thread: [RxComputationThreadPool-1]
filter 0       in Thread: [main]
observe 0      in Thread: [main]
filter 1       in Thread: [main]
filter 2       in Thread: [main]
observe 2      in Thread: [main]
Copy the code

ObservableObserveOn#onNext() is called on the main thread while MapObserver#onNext() is executed on the compute thread.

Simply put, observeOn() determines on which thread the downstream will be executed

But how do you control on which thread the most upstream, the observablecate that emits data, executes? So the thread that changes the subscription changes the thread that sends the data and subscribeOn() specifies a scheduler. Returns a new Observable (ObservableSubscribeOn) that allows upstream and downstream subscriptions to occur on a given scheduler (thread), as in

Observable
    .create()
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.computation())
    .map()
    .observeOn(AndroidSchedulers.mainThread())
    .filter()
    .subscribe();
Copy the code
emitting "0"   in Thread: [RxCachedThreadScheduler-1]
emitting "1"   in Thread: [RxCachedThreadScheduler-1]
emitting "2"   in Thread: [RxCachedThreadScheduler-1]
map ("0" -> 0) in Thread: [RxComputationThreadPool-1]
map ("1" -> 1) in Thread: [RxComputationThreadPool-1]
map ("2" -> 2) in Thread: [RxComputationThreadPool-1]
filter 0       in Thread: [main]
observe 0      in Thread: [main]
filter 1       in Thread: [main]
filter 2       in Thread: [main]
observe 2      in Thread: [main]
Copy the code

The visible subscribeOn(schedulers.io ()) makes the upstream ObservableCreate and the downstream subscriptions that occur on the I/O thread, where the transmitting data is located

To put it simply, subscribeOn() determines which thread to emit data in the first place

Don’t use subscribeOn() multiple times, because that doesn’t make sense and confuses logic. If it is called multiple times, then the subsequent subscribeOn() doesn’t affect the thread where the emitted data is sent, the thread where the downstream operators are executed, and only the thread where the intermediate subscription is executed (which has no effect on the user). So don’t call subscribeOn() multiple times

// Wrong use case
Observable
    .create()
    .subscribeOn(Schedulers.io())
    .map()
    .subscribeOn(Schedulers.computation())
    .filter()
    .subscribe();
Copy the code
emitting "0"   in Thread: [RxCachedThreadScheduler-1]
map ("0" -> 0) in Thread: [RxCachedThreadScheduler-1]
filter 0       in Thread: [RxCachedThreadScheduler-1]
observe 0      in Thread: [RxCachedThreadScheduler-1]
emitting "1"   in Thread: [RxCachedThreadScheduler-1]
map ("1" -> 1) in Thread: [RxCachedThreadScheduler-1]
filter 1       in Thread: [RxCachedThreadScheduler-1]
emitting "2"   in Thread: [RxCachedThreadScheduler-1]
map ("2" -> 2) in Thread: [RxCachedThreadScheduler-1]
filter 2       in Thread: [RxCachedThreadScheduler-1]
observe 2      in Thread: [RxCachedThreadScheduler-1]
Copy the code

Note Only the first subscribeOn() determines the thread where the data is emitted

Observable [Create, SubscribeOn, Map, SubscribeOn , Filter, Subscribe] Observer ObservableFilter Subscribe is an ObservableFilter that we define in the main thread

The ObservableSubscribeOn SUBSCRIBE FilterObserver is executed in the main thread The ObservableSubscribeOn#subscribeActual() is executed in the calculation thread The ObservableSubscribeOn subscribe MapObserver is executed in the computation thread The ObservableCreate subscribe SubscribeOnObserver is executed in the I/O thread ObservableCreate#subscribeActual() emitters. OnNext () ObservableCreate# onNext() ObservableCreate#subscribeActual() emitters MapObserver#onNext() executes in the I/O thread SubscribeOnObserver#onNext() executes in the I/O thread FilterObserver#onNext() executes in the I/O thread what we define onNext()


Merge

Merge () merges multiple Observables into a collection and calls flatMap(). So merge is not guaranteed to be sequential

To merge multiple Observables in order, use concat()

MergeDelayError () allows one Observable instance to merge another with mergeWith() if you don’t want to interrupt the stream because of an error in one of them

Zip

Zip () compresses the i-th data item of all Observables in order. The least Observable may be disposed immediately and not receive the complete callback (doOnComplete()).

An Observable instance can use zipWith() to zip other instances


StartWith

Emit one (startWithItem()) or more (startWithArray, startWithIterable()) data items before emitting them, or startWith() an Observable

Concat

Concat () can join together multiple Observables in order, and then launch the next one after one is launched

An Observable instance can concatWith() to other instances


Timer

Timer () can emit an 0L after a given time delay, which is emitted by default on the compute thread, and of course the scheduler can be specified

Interval

Interval () can emit increasing numbers starting from 0L at a given time interval, on the calculation thread by default, and of course the scheduler can specify the first delay: Interval (2, 1, timeunit.seconds), interval(2, 1, timeunit.seconds), interval(2, 1, timeunit.seconds), interval(2, 1, timeunit.seconds), interval(2, 1, timeunit.seconds), interval(2, 1, TimeUnit. You can specify which number to start from and how many numbers to fire in total: intervalRange(5, 10, 2, 1, timeUnit.seconds), starting at 5L, firing the first one after 2s, and then firing the rest every 1s

Delay

Delay () causes new Observables to delay launching each data for a given time. Error does not delay launching delaySubscription() delays subscribing to the current Observable


SkipUntil and SkipWhile

SkipUntil () tells the source Observable to give up emitting data until the given Observable emits data. SkipWhile () tells the source Observable to give up emitting data until the given condition becomes false

TakeUntil and TakeWhile

TakeUntil () makes the source Observable complete takeWhile() mirrors the data of the emitting source Observable immediately after the given Observable emits data. Complete immediately until the given condition becomes false


Catch

The Catch operator intercepts onError notifications and replaces them with data items or sequences of data items. OnErrorReturn () does not call the Observer onError() method when an Observable encounters an error. Instead, fire a given data item and complete onErrorResumeNext() without calling the Observer onError() method when Observable encounters error. Instead, it gives control to the given Observable onErrorComplete() and does not call the Observer onError() method when an Observable encounters an error. Error notifications are discarded and complete directly

Retry

The Retry operator resubscribes (that is, retries) on onError, and retransmitting data Retry () specifies the maximum number of retries. RetryUntil () can stop retries when a given function returns true. RetryWhen () is passed to the given function when the source Observable encounters an error. If the Observable returned by a given function emits onComplete or onError, pass the signal down (not the original error signal), or re-subscribe to the source Observable

Observable
    .<String>create(emitter -> {
        System.out.println(String.format(Locale.US, "emitting \"1\" in Thread: [%s]", Thread.currentThread().getName()));
        emitter.onNext("1");
        emitter.onComplete();
    })
    .flatMap(s -> Observable.error(new RuntimeException("always fails")))
    .retryWhen(attempts -> {
        AtomicInteger counter = new AtomicInteger();
        returnattempts.takeWhile(e -> counter.getAndIncrement() ! =3)
                .flatMap(e -> {
                    System.out.println(String.format(Locale.US, "retry delay %ds in Thread: [%s]", counter.get(), Thread.currentThread().getName()));
                    return Observable.timer(counter.get(), TimeUnit.SECONDS);
                });
    })
    .subscribe(s -> {
        System.out.println(String.format(Locale.US, "observe \"%s\" in Thread: [%s]", s, Thread.currentThread().getName()));
    }, throwable -> {
        System.out.println(String.format(Locale.US, "observe error in Thread: [%s]", Thread.currentThread().getName()));
    }, () -> {
        System.out.println(String.format(Locale.US, "observe complete in Thread: [%s]", Thread.currentThread().getName()));
    });
Copy the code
18:33 emitting "1"     in Thread: [main]
18:33 retry delay 1s   in Thread: [main]
18:34 emitting "1"     in Thread: [RxComputationThreadPool-4]
18:34 retry delay 2s   in Thread: [RxComputationThreadPool-4]
18:36 emitting "1"     in Thread: [RxComputationThreadPool-1]
18:36 retry delay 3s   in Thread: [RxComputationThreadPool-1]
18:39 emitting "1"     in Thread: [RxComputationThreadPool-2]
18:39 observe complete in Thread: [RxComputationThreadPool-2]
Copy the code

As you can see, the onComplete signal is finally received after three retries, because takeWhile() emits onComplete and passes directly down to the observer we defined. If you want to finally receive onError, instead of using takeWhile(), you should simply return an Observable like Observable.error(throwable) if the retry condition is not met

More operators

If you want to customize the operator or combine several operators into a single custom operator, you can customize ObservableOperator using lift(), Or use compose() to give the upstream Observable the given ObservableTransformer

More observables

There are a few special Observables:

  • FlowableSupport the back pressure
  • SingleLaunch 1 success value or error
  • CompletableLaunch complete or error
  • MaybeLaunch 1 success value either complete or error

Of course, all of these provide functions that can be converted to each other if you want an object to be both an Observable and an Observer, if you want an asynchronous operation to be an Observable, if you want to create a hot Observable, you need a more flexible Subject

Subject inherits Observable and implements an Observer, which has several implementation classes

  • AsyncSubjectOnly when calledonComplete()The last cached value and complete,onError()When, the nearest value is not emitted
  • BehaviorSubjectOnce you haveObserverSubscribe to it and start sending the most recent values,onError()When, the nearest value is not emitted
  • PublishSubjectData can be emitted immediately after creation, and it does not cache any data
  • ReplaySubjectNo matterObserverWhen do you subscribe to send all the data you’ve sent before