The article directories

Common operators

Create operator

Create ()

private void rxCreate() { Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("1"); emitter.onComplete(); }}); Observer<String> observer = new Observer<String>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(String s) { Log.i(TAG, "onNext: " + s); } @Override public void onError(Throwable e) { } @Override public void onComplete() { Log.i(TAG, "onComplete: "); }}; observable.subscribe(observer); }Copy the code

Just ()

/** * @author: Yangtianfu * @date: {2019/4/10 15:38} * @description Creates an observer and sends an event. The number of events sent cannot exceed 10. */ private void initJust() { Observable.just(1, 2, 3) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe: "); } @Override public void onNext(Integer integer) { Log.i(TAG, "onNext: " + integer); } @Override public void onError(Throwable e) { Log.e(TAG, "onError: " + e.toString()); } @Override public void onComplete() { Log.i(TAG, "onComplete: "); }}); }Copy the code

FromArray ()

/** * @author: yangtianfu * @date: {2019/4/10 15:45} * @description This method is similar to just(), except fromArray can pass in more than 10 variables, and it can pass in an array. */ private void rxFromArray() { Integer array[] = {1, 2, 3, 4}; Observable.fromArray(array) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe: "); } @Override public void onNext(Integer integer) { Log.i(TAG, "onNext: " + integer); } @Override public void onError(Throwable e) { Log.e(TAG, "onError: " + e.toString()); } @Override public void onComplete() { Log.i(TAG, "onComplete: "); }}); }Copy the code

FromCallable ()

/** * @author: Yangtianfu * @date: {2019/4/10 15:49} * public static <T> Observable<T> fromCallable <? Extends T> supplier) * @description Java.util.concurrent extends T> supplier) * @description * Except that it returns a result value, which is sent to the observer. */ private void rxFromCallable() { Observable.fromCallable(new Callable<Integer>() { @Override public Integer call() { return 1; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.i(TAG, "accept: " + integer); }}); }Copy the code

FromFuture ()

/** * @author: Yangtianfu * @date: {2019/4/10 15:54} * public static <T> Observable<T> fromFuture(Future<? Extends T> Future) * @description is a future from java.util.concurrent. The future extends T> Future) * @description is a future from java.util.concurrent. * It can get the value returned by Callable via get() */ private void rxfromFuture() {final FutureTask<String> FutureTask = new FutureTask<>(new Callable<String>() { @Override public String call() throws Exception { Log.i(TAG, "futureTask is running "); Return "futureTask returns result "; }}); Observable.fromfuture (futureTask).doonSubscribe (new Consumer<Disposable>() {// doOnSubscribe() sends events only when it subscribes @Override public void accept(Disposable disposable) throws Exception { futureTask.run(); } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.i(TAG, "accept: " + s); }}); }Copy the code

FromIterable ()

/** * @author: yangtianfu * @date: {2019/4/10 16:03} * @Description public static <T> Observable<T> fromIterable(Iterable<? Extends T> source) */ Private void rxfromIterable() {List<Integer> List = new ArrayList<>(); list.add(0); list.add(1); list.add(2); list.add(3); Observable.fromIterable(list) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe: "); } @Override public void onNext(Integer integer) { Log.i(TAG, "onNext: " + integer); } @Override public void onError(Throwable e) { } @Override public void onComplete() { Log.i(TAG, "onComplete: "); }}); }Copy the code

Defer ()

/** * @author: Yangtianfu * @date: {2019/4/10 16:06} * public static <T> Observable<T> defer(Callable<? extends ObservableSource<? Extends T>> supplier * @description extends T>> supplier */ private void rxdefer() {Observable<Integer> Observable =  Observable.defer(new Callable<ObservableSource<? extends Integer>>() { @Override public ObservableSource<? extends Integer> call() throws Exception { return Observable.just(i); }}); i = 200; Observer observer = new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer integer) { Log.i(TAG, "onNext: " + integer); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }; observable.subscribe(observer); i = 300; observable.subscribe(observer); // Only 200 and 300 will be sent out. Defer () only creates a new observer when the observer subscribes, so it prints each time it subscribes and prints the most recent value of I. }Copy the code

The timer ()

/** * @author: yangtianfu * @date: {2019/4/10 16:14} * public static Observable<Long> timer(long delay, TimeUnit unit) * @description A value of 0L is sent to the observer after the specified time. */ private void rxtimer() { Observable.timer(2, TimeUnit.SECONDS) .subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { } @Override public  void onNext(Long aLong) { Log.i(TAG, "onNext: " + aLong); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); }Copy the code

The interval ()

/** * @author: yangtianfu * @date: {2019/4/10 16:17} * public static Observable<Long> interval(long period, TimeUnit unit) * public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit) * @description sends an event at regular intervals, starting at 0, */ private void rxinterval() {observable. interval(2, 2); TimeUnit.SECONDS) .subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe: "); } @Override public void onNext(Long aLong) { Log.i(TAG, "onNext: " + aLong); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); }Copy the code

IntervalRange ()

/** * @author: yangtianfu * @date: {2019/4/10 16:23} * public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit) * public static Observable<Long> intervalRange(long start, long count, long initialDelay, * @description Specifies the start value and number of events to send, as in interval(). */ private void rxintervalRange() { Observable.intervalRange(2, 5, 2, 1, TimeUnit.SECONDS) .subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe: "); } @Override public void onNext(Long aLong) { Log.i(TAG, "onNext: " + aLong); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); // Receive 5 onNext events, starting from 2, delay 2s, send every 1s}Copy the code

Range ()

/** * @author: yangtianfu * @date: {2019/4/10 16:26} * public static Observable<Integer> range(final int start, Final int count */ private void rxrange() {Observable. 5) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe: "); } @Override public void onNext(Integer integer) { Log.i(TAG, "onNext: " + integer); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); }Copy the code

RangeLong ()

/** * @author: yangtianfu * @date: {2019/4/10 16:29} * public Static Observable<Long> rangeLong(Long start, Long count) Long */ private void rxrangeLong() {Observable. RangeLong (2, 5) .subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe: "); } @Override public void onNext(Long aLong) { Log.i(TAG, "onNext: " + aLong); } @Override public void onError(Throwable e) { } @Override public void onComplete() { Log.i(TAG, "onComplete: "); }}); }Copy the code

empty() & never() & error()

/** * @author: yangtianfu * @date: {2019/4/10 16:31} * empty() &never () &error () * @description onSubscribe is the message that must be sent * empty() : Send onComplete() event directly * never() : send no event * error() : Send onError() */ private void rxException() {observable.empty ().subscribe(new Observer<Object>() {@override public void onNext(Object o) { Log.i(TAG, "onNext: "); } @Override public void onError(Throwable e) { Log.i(TAG, "onError: "); } @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe: "); } @Override public void onComplete() { Log.i(TAG, "onComplete: "); }}); }Copy the code

Conversion operator

map()

/** * @author: Yangtianfu * @date: {2019/4/10 16:35} * public final <R> Observable<R> map(Function<? super T, ? Extends R> mapper) * @description Map Extends R> mapper) * @description Map extends R> mapper) * @description Map Extends R> mapper) * @description Map Extends R> mapper) */ private void rxMap () {// Convert data of type Integer to String Observable.just(1, 2, 3) .map(new Function<Integer, String>() {@override public String apply(Integer Integer) throws Exception {return "String:" + Integer; } }).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe: "); } @Override public void onNext(String s) { Log.i(TAG, "onNext: " + s); } @Override public void onError(Throwable e) { } @Override public void onComplete() { Log.i(TAG, "onComplete: "); }}); }Copy the code

flatMap()

/** * @author: Yangtianfu * @date: {2019/4/10 16:39} * public Final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? Extends R>> mapper) * @description extends R>> mapper) * @description Return a new Observable (flatMap() returns an Observerable) */ private void rxflatMap() {Observable. FromIterable (personList)// Send person .flatMap(new Function<Person, ObservableSource<Plan>>() { @Override public ObservableSource<Plan> apply(Person person) throws Exception { return Observable.fromIterable(person.getPlanList()); }}). FlatMap (new Function< plan, ObservableSource<String>>() { @Override public ObservableSource<String> apply(Plan plan) throws Exception { return Observable.fromIterable(plan.getActionList()); // Convert plan to action, Subscribe(new Observer<String>() {@override public void onSubscribe(Disposable d) {} @override public void onNext(String s) { Log.i(TAG, "onNext: " + s); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); }Copy the code

concatMap()

/** * @author: Yangtianfu * @date: {2019/4/10 17:04} * public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) * public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? Extends R>> mapper, int prefetch) * @description concatMap() and flatMap() are basically the same, except that concatMap() forwards events in order. FlatMap () is unordered. */ private void rxconcatMap() { Observable.fromIterable(personList) .concatMap(new Function<Person, ObservableSource<Plan>>() { @Override public ObservableSource<Plan> apply(Person person) throws Exception { if ("chan".equals(person.getName())) { return Observable.fromIterable(person.getPlanList()).delay(10, TimeUnit.SECONDS); } return Observable.fromIterable(person.getPlanList()); } }).subscribe(new Observer<Plan>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Plan plan) { Log.i(TAG, "onNext: " + plan.getContent()); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); }Copy the code

buffer()

/** * @author: yangtianfu * @date: {2019/4/10 17:13} * public Final Observable<List<T>> Buffer (int count, int skip) * @description gets a number of events from events to send. These events are emitted together in a buffer. * Buffer takes two arguments, count and skip. Count the number of buffer elements, skip means that when the buffer is full, Private void rxbuffer() {Observable. Just (1, 2, 3, 4, 5). 1) .subscribe(new Observer<List<Integer>>() { @Override public void onSubscribe(Disposable d) { } @Override public void OnNext (List<Integer> integers) {log. I (TAG, "buffer size:" + integers.size()); for (Integer i : integers) { Log.i(TAG, "onNext: " + i); } } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); //I/MainActivity: buffer size: 2 //I/MainActivity: onNext: 1 //I/MainActivity: onNext: 2 //I/MainActivity: buffer size: 2 //I/MainActivity: onNext: 2 //I/MainActivity: onNext: 3 //I/MainActivity: buffer size: 2 //I/MainActivity: onNext: 3 //I/MainActivity: onNext: 4 //I/MainActivity: buffer size: 2 //I/MainActivity: onNext: 4 //I/MainActivity: onNext: 5 //I/MainActivity: buffer size: 1 //I/MainActivity: onNext: 5Copy the code

groupBy()

/** * @author: yangtianfu * @date: {2019/4/10 17:23} * public final <K> Observable<GroupedObservable<K, T>> groupBy(Function<? super T, ? Extends K> keySelector) * @description groups the sent data, and each group returns an observed. */ private void rxgroupBy() { Observable.just(5, 2, 3, 4, 1, 6, 8, 9, 7, 10) .groupBy(new Function<Integer, Integer>() { @Override public Integer apply(Integer integer) throws Exception { return integer % 3; } }).subscribe(new Observer<GroupedObservable<Integer, Integer>>() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe: "); } @Override public void onNext(final GroupedObservable<Integer, Integer> integerIntegerGroupedObservable) { Log.i(TAG, "onNext: "); integerIntegerGroupedObservable.subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "GroupedObservable onSubscribe: "); } @Override public void onNext(Integer integer) { Log.i(TAG, "GroupedObservable onNext groupName: " + integerIntegerGroupedObservable.getKey() + "value:" + integer); } @Override public void onError(Throwable e) { } @Override public void onComplete() { Log.i(TAG, "GroupedObservable onComplete: "); }}); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); // I/MainActivity: GroupedObservable onNext groupName: 1value:1 // I/MainActivity: GroupedObservable onNext groupName: 0value:6 // I/MainActivity: GroupedObservable onNext groupName: 2value:8 // I/MainActivity: GroupedObservable onNext groupName: 0value:9 // I/MainActivity: GroupedObservable onNext groupName: 1value:7 // I/MainActivity: GroupedObservable onNext groupName: 1value:10 }Copy the code

scan()

/** * @author: yangtianfu * @date: {2019/4/10 17:32} * Public Final Observable<T> Scan (BiFunction<T, T, T> Accumulator) * @description aggregates data with some logic. */ private void rxscan() {Observable. Just (1, 2, 3, 4, 5). Scan (new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(Integer integer, Integer integer2) throws Exception { Log.i(TAG, "====================apply "); Log.i(TAG, "====================integer " + integer); Log.i(TAG, "====================integer2 " + integer2); return integer + integer2; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.i(TAG, "accept: " + integer); }}); // I/MainActivity: accept: 1 // I/MainActivity: ====================apply // I/MainActivity: ====================integer 1 // I/MainActivity: ====================integer2 2 // I/MainActivity: accept: 3 // I/MainActivity: ====================apply // I/MainActivity: ====================integer 3 // I/MainActivity: ====================integer2 3 // I/MainActivity: accept: 6 // I/MainActivity: ====================apply // I/MainActivity: ====================integer 6 // I/MainActivity: ====================integer2 4 // I/MainActivity: accept: 10 // I/MainActivity: ====================apply // I/MainActivity: ====================integer 10 // I/MainActivity: ====================integer2 5 // I/MainActivity: accept: 15 }Copy the code

window()

/** * @author: yangtianfu * @date: {2019/4/10 17:45} * public Final Observable<Observable<T> window(long count) * @description sends a specified number of events into a group. The count argument in the window represents the specified amount. * For example, if count is set to 2, each two pieces of data will be grouped into a group. */ private void rxwindow() { Observable.just(1, 2, 3, 4, 5) .window(2) .subscribe(new Observer<Observable<Integer>>() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe: "); } @Override public void onNext(Observable<Integer> integerObservable) { integerObservable.subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe: "); } @Override public void onNext(Integer integer) { Log.i(TAG, "onNext: " + integer); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); // I/MainActivity: onSubscribe: // I/MainActivity: onNext: 1 // I/MainActivity: onNext: 2 // I/MainActivity: onSubscribe: // I/MainActivity: onNext: 3 // I/MainActivity: onNext: 4 // I/MainActivity: onSubscribe: // I/MainActivity: onNext: 5 }Copy the code

Combinatorial operator

concat()

/** * @author: Yangtianfu * @date: {2019/4/10 18:00} * public static <T> ObservableSource<? extends T> source1, ObservableSource<? extends T> source2, ObservableSource<? extends T> source3, ObservableSource<? Extends T> source4) * @description Allows you to group multiple observers together and then send events in the order they were sent before. It's important to note that, */ private void rxconcat() {observable. concat(1, 2), observable. just(3, 4), Observable.just(5, 6), Observable.just(7, 8)) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer integer) { Log.i(TAG, "onNext: " + integer); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); }Copy the code

concatArray()

/** * @author: Yangtianfu * @date: {2019/4/11 9:17} * public static <T> Observable<T> concatArray(ObservableSource<? extends T>... Sources * @description works the same as concat(), but concatArray() can send more than four observers. */ private void rxconcatArray() { Observable.concatArray(Observable.just(1, 2), Observable.just(3, 4), Observable.just(5, 6), Observable.just(7, 8), Observable.just(9, 10)) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer integer) { Log.i(TAG, "onNext: " + integer); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); }Copy the code

merge & mergeArray()

/** * @author: Yangtianfu * @date: {2019/4/11 9:21} * public static <T> Observable<T> merge(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2, ObservableSource<? extends T> source3, ObservableSource<? Extends T> source4) * @description This extends T> source4) * @description This extends T> source4) * @description This extends T> source4) * @description This extends T> source4) * @description This extends T> source4) * @description This extends T> source4) * @description This extends T> source4) And MergeArray does the same thing as merge, Private void rxmerge() {observable. merge(observable. interval(1, 1) TimeUnit.SECONDS).map(new Function<Long, String>() { @Override public String apply(Long aLong) throws Exception { return "A" + aLong; } }), Observable.interval(1, TimeUnit.SECONDS).map(new Function<Long, String>() { @Override public String apply(Long aLong) throws Exception { return "B" + aLong; } }) ).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(String s) { Log.i(TAG, "onNext: " + s); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); // I/MainActivity: onNext: A3 // I/MainActivity: onNext: B3 // I/MainActivity: onNext: A4 // I/MainActivity: onNext: A3 // I/MainActivity: onNext: B3 // I/MainActivity: onNext: A4 // I/MainActivity: onNext: B4 // I/MainActivity: onNext: A5 // I/MainActivity: onNext: B5 // I/MainActivity: onNext: A6 // I/MainActivity: OnNext: B6 // I/MainActivity: onNext: A7 // I/MainActivity: onNext: B7 // I/MainActivity: onNext: A8Copy the code

concatArrayDelayError() & mergeArrayDelayError()

/** * @author: yangtianfu * @date: {2019/4/11 9:33} * public static <T> Observable<T> concatArrayDelayError(ObservableSource<? extends T>... sources) * public static <T> Observable<T> mergeArrayDelayError(ObservableSource<? extends T>... Sources) * @description In both concatArray() and mergeArray() methods, if either of the observer sends an Error event, then it will stop sending the event. * If you want the onError() event to be delayed until all observed events have been sent, You can use concatArrayDelayError() and mergeArrayDelayError() */ private void rxconcatArrayDelayError() { Observable.concatArrayDelayError( Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) { emitter.onNext(1); emitter.onError(new NullPointerException()); } }), Observable.just(2, 3, 4) ).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer integer) { Log.i(TAG, "onNext: " + integer); } @Override public void onError(Throwable e) { Log.e(TAG, "onError: " + e.toString()); } @Override public void onComplete() { } }); // I/MainActivity: onNext: 1 // I/MainActivity: onNext: 2 // I/MainActivity: onNext: 3 // I/MainActivity: onNext: 4 // E/MainActivity: onError: java.lang.NullPointerException }Copy the code

combineLatest()

/** * @author: yangtianfu * @date: {2019/4/11 11:53} * @description combineLatest() works like ZIP (), but the sequence of events that combineLatest() sends is related to the timeline that it sends, * When all observables in combineLatest() send events, as long as one of them sends an event, (zip sends both.) * This event is sent in combination with the latest event sent by another Observable */ private void combineLatestDelayError() is added to the onError() function rxcombineLatest() { Observable.combineLatest( Observable.intervalRange(1, 4, 1, 1, TimeUnit.SECONDS) .map(new Function<Long, String>() { @Override public String apply(Long aLong) throws Exception { String s1 = "A" + aLong; Log. I (TAG, "A sent event: "+ s1); return s1; } }), Observable.intervalRange(1, 5, 2, 2, TimeUnit.SECONDS) .map(new Function<Long, String>() { @Override public String apply(Long aLong) throws Exception { String s2 = "B" + aLong; Log. I (TAG, "event sent by B:" + s2); return s2; } }), new BiFunction<String, String, String>() { @Override public String apply(String s, String s2) throws Exception { String res = s + s2; return res; } } ) .subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe: "); } @override public void onNext(String s) {log. I (TAG, "last received event onNext:" + s); } @Override public void onError(Throwable e) { Log.i(TAG, "onError: "); } @Override public void onComplete() { Log.i(TAG, "onComplete: "); }}); }Copy the code

reduce()

/** * @author: yangtianfu * @date: {2019/4/11 13:27} * public final Maybe<T> reduce(BiFunction<T, T, T> reducer) * @description and scan() also aggregate sent data in a logical way. The difference between these two operators is that each time scan() processes data, the event is sent to the observer. Reduce () aggregates all the data before sending the event to the observer. */ private void rxreduce() { Observable.just(0, 1, 2, 3) .reduce(new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(Integer integer, Integer integer2) throws Exception { int res = integer + integer2; Log.i(TAG, "apply: " + integer); Log.i(TAG, "apply2: " + integer2); Log.i(TAG, "res: " + res); return res; } }) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.i(TAG, "accept: " + integer); }}); // I/MainActivity: apply: 0 // I/MainActivity: apply2: 1 // I/MainActivity: res: 1 // I/MainActivity: apply: 1 // I/MainActivity: apply2: 2 // I/MainActivity: res: 3 // I/MainActivity: apply: 3 // I/MainActivity: apply2: 3 // I/MainActivity: res: 6 // I/MainActivity: accept: 6 }Copy the code

collect()

/** * @author: Yangtianfu * @date: {2019/4/11 13:33} * public final <U> Single<U> collect(Callable<? extends U> initialValueSupplier, BiConsumer<? super U, ? Super T> collector) * @description collects data into the data structure. */ private void rxcollect() { Observable.just(1, 2, 3, 4) .collect(new Callable<ArrayList<Integer>>() { @Override public ArrayList<Integer> call() throws Exception { return new ArrayList<>(); } }, new BiConsumer<ArrayList<Integer>, Integer>() { @Override public void accept(ArrayList<Integer> integers, Integer integer) throws Exception { integers.add(integer); Subscribe (new Consumer<ArrayList<Integer>>() {@override public void accept(ArrayList<Integer>) integers) throws Exception { Log.i(TAG, "accept: " + integers); // I/MainActivity: accept: [1, 2, 3, 4] } }); }Copy the code

zip()

/** * @author: Yangtianfu * @date: {2019/4/11 10:06} * public static <T1, T2, R> Observable<R> zip(ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2, BiFunction<? super T1, ? super T2, ? Extends R> zipper) * @description combines multiple objects based on the order in which they send events. The number of events sent is the same as the minimum number of events in the source Observable. The following code executes 5 times instead of 6. Because when the fifth at the end of the first Observeable will send onComplate events * / private void rxzip () {observables. Zip (observables. IntervalRange (1, 5, 1, 1, TimeUnit.SECONDS) .map(new Function<Long, String>() { @Override public String apply(Long aLong) throws Exception { String s1 = "A" + aLong; Log. I (TAG, "apply: A send "); return s1; } }), Observable.intervalRange(1, 6, 1, 1, TimeUnit.SECONDS) .map(new Function<Long, String>() { @Override public String apply(Long aLong) throws Exception { String s2 = "B" + aLong; Log. I (TAG, "apply: B send "); return s2; } }), new BiFunction<String, String, String>() { @Override public String apply(String s, String s2) throws Exception { String res = s + s2; return res; } } ) .subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe: "); } @Override public void onNext(String s) { Log.i(TAG, "onNext: " + s); } @Override public void onError(Throwable e) { Log.e(TAG, "onError: " + e.toString()); } @Override public void onComplete() { Log.i(TAG, "onComplete: "); }}); }Copy the code

startWith() & startWithArray()

/** * @author: yangtianfu * @date: {2019/4/11 13:41} * public final Observable<T> startWith(T item) * public final Observable<T> startWithArray(T... Items) * @description appends events before sending them, startWith() appends one event, and startWithArray() can append multiple events. Additional events are emitted first. */ private void rxstartWithorArray() { Observable.just(5, 6, 7) .startWith(1) .startWithArray(2, 3, 4) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.i(TAG, "accept: " + integer); }}); // I/MainActivity: accept: 2 // I/MainActivity: accept: 3 // I/MainActivity: accept: 4 // I/MainActivity: accept: 1 // I/MainActivity: accept: 5 // I/MainActivity: accept: 6 // I/MainActivity: accept: 7 }Copy the code

count()

/** * @author: yangtianfu * @date: {2019/4/11 13:46} * public final Single<Long> count() * @description returns only the number of events sent by the observer */ private void rxcount() { Observable.just(1, 2, 3) .count() .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.i(TAG, "accept: " + aLong); }}); // I/MainActivity: accept: 3 }Copy the code

Functional operator

delay()

/** * @author: yangtianfu * @date: {2019/4/11 13:53} * public Final Observable<T> Delay (long delay, TimeUnit unit) * @description Delays sending an event. */ private void rxdelay() { Observable.just(1, 2, 3) .delay(2, TimeUnit.SECONDS) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { // onSubscribe Log. I (TAG, "onSubscribe: "); } @Override public void onNext(Integer integer) { Log.i(TAG, "onNext: " + integer); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); }Copy the code

doOnEach()

/** * @author: Yangtianfu * @date: {2019/4/11 13:57} * public Final Observable<T> doOnEach(final Consumer<? Super Notification<T>> onNotification) * @Description Observable calls back to this method before sending an event. Private void rxdoOnEach() {observable. create(new ObservableOnSubscribe<Integer>() {@override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); } }) .doOnEach(new Consumer<Notification<Integer>>() { @Override public void accept(Notification<Integer> integerNotification) throws Exception { Log.i(TAG, " doOnEach accept: " + integerNotification.getValue()); } }) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe: "); } @Override public void onNext(Integer integer) { Log.i(TAG, "onNext: " + integer); } @Override public void onError(Throwable e) { } @Override public void onComplete() { Log.i(TAG, "onComplete: "); }}); // The doOnEach method is called before each event and the value sent by onNext() can be retrieved. // I/MainActivity: onSubscribe: // I/MainActivity: doOnEach accept: 1 // I/MainActivity: onNext: 1 // I/MainActivity: doOnEach accept: 2 // I/MainActivity: onNext: 2 // I/MainActivity: doOnEach accept: 3 // I/MainActivity: onNext: 3 // I/MainActivity: doOnEach accept: null // I/MainActivity: onComplete: }Copy the code

doOnNext()

/** * @author: Yangtianfu * @date: {2019/4/11 14:05} * public Final Observable<T> doOnNext Super T> onNext) * @Description Observable calls this method before sending onNext(), only on onNext events. Private void rxdoOnNext() {ObservableOnSubscribe (new ObservableOnSubscribe<Integer>() {@override public  void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); } }).doOnNext(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.i(TAG, "doOnNext accept: " + integer); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe: "); } @Override public void onNext(Integer integer) { Log.i(TAG, "onNext: " + integer); } @Override public void onError(Throwable e) { Log.i(TAG, "onError: "); Log.d(TAG, "onError: "); Log.v(TAG, "onError: "); } @Override public void onComplete() { Log.i(TAG, "onComplete: "); }}); // I/MainActivity: onSubscribe: // I/MainActivity: doOnNext accept: 1 // I/MainActivity: onNext: 1 // I/MainActivity: doOnNext accept: 2 // I/MainActivity: onNext: 2 // I/MainActivity: doOnNext accept: 3 // I/MainActivity: onNext: 3 // I/MainActivity: onComplete: }Copy the code

doAfterNext()

/** * @author: Yangtianfu * @date: {2019/4/11 14:20} * public Final Observable<T> doAfterNext(Consumer<? Super T> onAfterNext) * @Description Observable calls back to this method after sending onNext(). */ private void rxdoAfterNext() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); } }).doAfterNext(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.i(TAG, "doAfterNext accept: " + integer); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe: "); } @Override public void onNext(Integer integer) { Log.i(TAG, "onNext: " + integer); } @Override public void onError(Throwable e) { Log.e(TAG, "onError: ", e); } @Override public void onComplete() { Log.i(TAG, "onComplete: "); }}); // I/MainActivity: onSubscribe: // I/MainActivity: onNext: 1 // I/MainActivity: doAfterNext accept: 1 // I/MainActivity: onNext: 2 // I/MainActivity: doAfterNext accept: 2 // I/MainActivity: onNext: 3 // I/MainActivity: doAfterNext accept: 3 // I/MainActivity: onComplete: }Copy the code

DoOnComplete ()

/** * @author: yangtianfu * @date: {2019/4/11 14:29} * Public Final Observable<T> doOnComplete(Action onComplete) * @description Sends */ before sending onComplete () private void rxdoOnComplete() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); } }).doOnComplete(new Action() { @Override public void run() throws Exception { Log.i(TAG, "doOnComplete run: "); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe: "); } @Override public void onNext(Integer integer) { Log.i(TAG, "onNext: " + integer); } @Override public void onError(Throwable e) { Log.e(TAG, "onError: ", e); } @Override public void onComplete() { Log.i(TAG, "onComplete: "); }}); // I/MainActivity: onSubscribe: // I/MainActivity: onNext: 1 // I/MainActivity: onNext: 2 // I/MainActivity: onNext: 3 // I/MainActivity: doOnComplete run: // I/MainActivity: onComplete: }Copy the code

DoOnError ()

/** * @author: Yangtianfu * @date: {2019/4/11 14:38} * public Final Observable<T> doOnError(Consumer<? Super Throwable> onError) * @Description Observable calls this method before sending onError(). */ private void rxdoOnError() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onError(new NullPointerException()); } }).doOnError(new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.i(TAG, "doOnError accept: " + throwable); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe: "); } @Override public void onNext(Integer integer) { Log.i(TAG, "onNext: " + integer); } @Override public void onError(Throwable e) { Log.i(TAG, "onError: "); } @Override public void onComplete() { Log.i(TAG, "onComplete: "); } // I/MainActivity: onSubscribe: // I/MainActivity: onNext: 1 // I/MainActivity: onNext: 2 // I/MainActivity: onNext: 3 // I/MainActivity: doOnError accept: java.lang.NullPointerException // I/MainActivity: onError: }); }Copy the code

DoOnSubscribe ()

/** * @author: Yangtianfu * @date: {2019/4/11 15:37} * public Final Observable<T> doOnSubscribe(Consumer<? Super Disposable> onSubscribe) * @Description Observable calls back to this method before sending onSubscribe(). */ private void rxdoOnSubscribe() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); } }).doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { Log.i(TAG, " doOnSubscribe accept: "); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe: "); } @Override public void onNext(Integer integer) { Log.i(TAG, "onNext: " + integer); } @Override public void onError(Throwable e) { Log.i(TAG, "onError: "); } @Override public void onComplete() { Log.i(TAG, "onComplete: "); } // I/MainActivity: doOnSubscribe accept: // I/MainActivity: onSubscribe: // I/MainActivity: onNext: 1 // I/MainActivity: onNext: 2 // I/MainActivity: onNext: 3 // I/MainActivity: onComplete: }); }Copy the code

doOnDispose

/** * @author: yangtianfu * @date: {2019/4/11 15:54} * Public Final Observable<T> doOnDispose(Action onDispose) * @description The method is then called back. */ private void rxdoOnDispose() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); } }).doOnDispose(new Action() { @Override public void run() throws Exception { Log.i(TAG, "run: doOnDispose"); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe: "); disposable = d; } @Override public void onNext(Integer integer) { Log.i(TAG, "onNext: " + integer); disposable.dispose(); } @Override public void onError(Throwable e) { Log.i(TAG, "onError: "); } @Override public void onComplete() { Log.i(TAG, "onComplete: "); }}); // I/MainActivity: onSubscribe: // I/MainActivity: onNext: 1 // I/MainActivity: run: doOnDispose }Copy the code

doOnLifecycle()

/** * @author: Yangtianfu * Date: {2019/4/11 16:00} * public Final Observable<T> doOnLifecycle(final Consumer<? Super Disposable> onSubscribe, final Action onDispose) * @description The callback method that calls the first argument of the method before calling onSubscribe, This callback method can be used to decide whether to unsubscribe * doOnLifecycle() the callback method of the second argument is the same as doOnDispose() */ private void rxdoOnLifecycle() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); }}). DoOnLifecycle ( New Consumer<Disposable>() {@override public void Accept (Disposable) throws Exception {// If unsubscribed here, neither doOnDispose Action nor doOnLifecycle Action will be called log. I (TAG, "Accept: doOnLifecycle"); New Action() {@override public void run() throws Exception {log. I (TAG, "run: doOnLifecycle"); } } ).doOnDispose(new Action() { @Override public void run() throws Exception { Log.i(TAG, "run: doOnDispose"); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe: "); disposable = d; } @Override public void onNext(Integer integer) { Log.i(TAG, "onNext: " + integer); disposable.dispose(); } @Override public void onError(Throwable e) { Log.i(TAG, "onError: "); } @Override public void onComplete() { Log.i(TAG, "onComplete: "); } // Both doOnDispose() and doOnLifecycle() will be called back after the onNext() method unsubscribe. // I/MainActivity: accept: doOnLifecycle // I/MainActivity: onSubscribe: // I/MainActivity: onNext: 1 // I/MainActivity: run: doOnDispose // I/MainActivity: run: DoOnLifecycle // If doOnLifecycle is used to unsubscribe, neither doOnDispose Action nor doOnLifecycle Action is called back}); }Copy the code

DoOnTerminate () & doAfterTerminate ()

/** * @author: yangtianfu * @date: {2019/4/11 16:13} * public final Observable<T> doOnTerminate(final Action onTerminate) * public final Observable<T> DoAfterTerminate (Action onFinally) * @description doOnTerminate is a callback before onError or onComplete is sent, * doAfterTerminate is a callback after onError or onComplete is sent. */ private void rxdoOnTerminate() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onError(new NullPointerException()); // emitter.onComplete(); } }).doOnTerminate(new Action() { @Override public void run() throws Exception { Log.i(TAG, "run: doOnTerminate"); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe: "); } @Override public void onNext(Integer integer) { Log.i(TAG, "onNext: " + integer); } @Override public void onError(Throwable e) { Log.i(TAG, "onError: "); } @Override public void onComplete() { Log.i(TAG, "onComplete: "); } // I/MainActivity: onSubscribe: // I/MainActivity: onNext: 1 // I/MainActivity: run: doOnTerminate // I/MainActivity: onError: }); }Copy the code

DoFinally ()

/** * @author: yangtianfu * @date: {2019/4/11 16:19} * Public Final Observable<T> doFinally(Action onFinally) * @description Calls back after all events are sent. * doAfterTerminate() is not called back after unsubscription, whereas doFinally() is called back anyway, at the end of the sequence of events. */ private void rxdoFinally() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); } }).doFinally(new Action() { @Override public void run() throws Exception { Log.i(TAG, "run: doFinally"); } }).doOnDispose(new Action() { @Override public void run() throws Exception { Log.i(TAG, "run: doOnDispose"); }}).doAfterTerminate(new Action() {@override public void run() throws Exception {// Dispose () method called DoAfterTerminate () is not called back. Log.i(TAG, "run: doAfterTerminate"); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe: "); disposable = d; } @Override public void onNext(Integer integer) { Log.i(TAG, "onNext: " + integer); // doAfterTerminate() is not called if dispose() method is called. // disposable.dispose(); } @override public void onError(Throwable e) {log. I (TAG, "onError: "); } @Override public void onComplete() { Log.i(TAG, "onComplete: "); } // I/MainActivity: onSubscribe: // I/MainActivity: onNext: 1 // I/MainActivity: run: doOnDispose // I/MainActivity: run: doFinally }); }Copy the code

OnErrorReturn ()

/** * @author: Yangtianfu * @date: {2019/4/11 16:44} * public Final Observable<T> onErrorReturn(Function<? super Throwable, ? Extends T> valueSupplier) * @description When a callback is received after an onError() event, the returned value is called back to the onNext() method. Private void rxonErrorReturn() {observable. create(new ObservableOnSubscribe<Integer>() {@override public  void subscribe(ObservableEmitter<Integer> emitter) { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onError(new NullPointerException()); } }).onErrorReturn(new Function<Throwable, Integer>() { @Override public Integer apply(Throwable throwable) throws Exception { Log.i(TAG, "apply: onErrorReturn"); return 404; } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe: "); } @Override public void onNext(Integer integer) { Log.i(TAG, "onNext: " + integer); } @Override public void onError(Throwable e) { Log.i(TAG, "onError: "); } @Override public void onComplete() { Log.i(TAG, "onComplete: "); } // I/MainActivity: onSubscribe: // I/MainActivity: onNext: 1 // I/MainActivity: onNext: 2 // I/MainActivity: onNext: 3 // I/MainActivity: apply: onErrorReturn // I/MainActivity: onNext: 404 // I/MainActivity: onComplete: }); }Copy the code

OnErrorResumeNext ()

/** * @author: Yangtianfu * @date: {2019/4/11 16:49} * public Final Observable<T> onErrorCheckext (Function<? super Throwable, ? extends ObservableSource<? Extends T>> resumeFunction) * @description Returns a new Observable when an onError() event is received and terminates the event sequence normally. */ private void rxOnErrorResumeNext() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onError(new NullPointerException()); } }).onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() { @Override public ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception { Log.i(TAG, "apply: onErrorResumeNext" + throwable); return Observable.just(4, 5, 6); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe: "); } @Override public void onNext(Integer integer) { Log.i(TAG, "onNext: " + integer); } @Override public void onError(Throwable e) { Log.i(TAG, "onError: "); } @Override public void onComplete() { Log.i(TAG, "onComplete: "); } // I/MainActivity: onSubscribe: // I/MainActivity: onNext: 1 // I/MainActivity: onNext: 2 // I/MainActivity: onNext: 3 // I/MainActivity: apply: onErrorResumeNextjava.lang.NullPointerException // I/MainActivity: onNext: 4 // I/MainActivity: onNext: 5 // I/MainActivity: onNext: 6 // I/MainActivity: onComplete: }); }Copy the code

OnExceptionResumeNext ()

/** * @author: yangtianfu * @date: {2019/4/11 16:54} * public final Observable<T> onExceptionResumeNext(final ObservableSource<? Extends T> Next) * @description is basically the same as onErrorResumeNext(), but this method can only catch exceptions. */ private void rxOnExceptionResumeNext() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onError(new Exception("404")); } }).onExceptionResumeNext(new Observable<Integer>() { @Override protected void subscribeActual(Observer<? super Integer> observer) { observer.onNext(333); observer.onComplete(); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe: "); } @Override public void onNext(Integer integer) { Log.i(TAG, "onNext: " + integer); } @Override public void onError(Throwable e) { Log.i(TAG, "onError: "); } @Override public void onComplete() { Log.i(TAG, "onComplete: "); }}); }Copy the code

Retry ()

/** * @author: yangtianfu * @date: {2019/4/11 16:59} * public Final Observable<T> Retry (long times) * @description Resends the event sequence if an error event occurs. Times is the number of retransmissions. */ private void rxretry() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onError(new Exception("404")); } }).retry(2) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe: "); } @Override public void onNext(Integer integer) { Log.i(TAG, "onNext: " + integer); } @Override public void onError(Throwable e) { Log.i(TAG, "onError: "); } @Override public void onComplete() { Log.i(TAG, "onComplete: "); } // I/MainActivity: onSubscribe: // I/MainActivity: onNext: 1 // I/MainActivity: onNext: 2 // I/MainActivity: onNext: 3 // I/MainActivity: onNext: 1 // I/MainActivity: onNext: 2 // I/MainActivity: onNext: 3 // I/MainActivity: onNext: 1 // I/MainActivity: onNext: 2 // I/MainActivity: onNext: 3 // I/MainActivity: onError: }); }Copy the code

RetryUntil ()

/** * @author: yangtianfu * @date: {2019/4/11 17:03} * public Final Observable<T> retryUntil(final BooleanSupplier stop) * @description You can use this method to determine whether to continue sending events. */ private void rxretryUntil() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onError(new Exception("404")); }}). RetryUntil (new BooleanSupplier() {@override public Boolean getAsBoolean() throws Exception {// Here to check whether to continue sending events. If (I == 100) {return true; if (I = 100) {return true; } return false; } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe: "); } @Override public void onNext(Integer integer) { Log.i(TAG, "onNext: " + integer); } @Override public void onError(Throwable e) { Log.i(TAG, "onError: "); } @Override public void onComplete() { Log.i(TAG, "onComplete: "); }}); }Copy the code

RetryWhen ()

/** * @author: Yangtianfu * @date: {2019/4/11 17:12} * public final void safeSubscribe(Observer<? Super T> s) * @description This method is called when the observed receives an exception or error event and returns a new observed. * If the returned observer sends an Error event, the previous observer does not continue to send the event. Private void rxretryWhen() {Observable. Create (new ObservableOnSubscribe<String>() {ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("chan"); emitter.onNext("ze"); emitter.onNext("de"); // emitter.onError(new Exception("404")); // Emitters. OnError (new Exception("303")); // Terminating sending emitters. OnNext ("haha"); } }).retryWhen(new Function<Observable<Throwable>, ObservableSource<? >>() { @Override public ObservableSource<? > apply(final Observable<Throwable> throwableObservable) throws Exception { return throwableObservable.flatMap(new Function<Throwable, ObservableSource<? >>() { @Override public ObservableSource<? > apply(Throwable throwable) throws Exception { Log.i(TAG, "apply: throwableObservable:" + throwable); If (throwable.tostring ().equals(" java.lang.exception: 404")) {return Observable. Just (" ignorable Exception "); } else {return Observable. Error (new Throwable(" terminated ")); }}}); } }).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe: "); } @Override public void onNext(String s) { Log.i(TAG, "onNext: " + s); } @Override public void onError(Throwable e) { Log.i(TAG, "onError: " + e); } @Override public void onComplete() { Log.i(TAG, "onComplete: "); } // I/MainActivity: onSubscribe: // I/MainActivity: onNext: chan // I/MainActivity: onNext: ze // I/MainActivity: onNext: de // I/MainActivity: apply: throwableObservable:java.lang.Exception: 303 // I/MainActivity: onError: Java.lang.Throwable: terminated}); }Copy the code

Repeat ()

/** * @author: yangtianfu * @date: {2019/4/11 17:35} * public Final Observable<T> repeat(long times) * Sends the observed event repeatedly. * @Description */ private void rxrepeat() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) { e.onNext(1); e.onNext(2); e.onNext(3); e.onComplete(); } }).repeat(2) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe: "); } @Override public void onNext(Integer integer) { Log.i(TAG, "onNext: " + integer); } @Override public void onError(Throwable e) { Log.i(TAG, "onError: " + e); } @Override public void onComplete() { Log.i(TAG, "onComplete: "); }}); // I/MainActivity: onSubscribe: // I/MainActivity: onNext: 1 // I/MainActivity: onNext: 2 // I/MainActivity: onNext: 3 // I/MainActivity: onNext: 1 // I/MainActivity: onNext: 2 // I/MainActivity: onNext: 3 // I/MainActivity: onComplete: }Copy the code

RetryWhen ()

/** * @author: Yangtianfu * @date: {2019/4/11 17:40} * public Final Observable<T> repeatWhen(final Function<? super Observable<Object>, ? extends ObservableSource<? >> Handler) * @description This method can return a new set of observed logic to determine whether to send the event again. * If the new observer returns an onComplete or onError event, the old observer will not continue to send the event. * If the observed returns another event, the event is repeated. */ private void rxrepeatWhen() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) { e.onNext(1); e.onNext(2); e.onNext(3); e.onComplete(); } }).retryWhen(new Function<Observable<Throwable>, ObservableSource<? >>() { @Override public ObservableSource<? > apply(Observable<Throwable> throwableObservable) throws Exception { return Observable.empty(); // return Observable.error(new Exception("404")); // return Observable.just(4); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe: "); } @Override public void onNext(Integer integer) { Log.i(TAG, "onNext: " + integer); } @Override public void onError(Throwable e) { Log.i(TAG, "onError: " + e); } @Override public void onComplete() { Log.i(TAG, "onComplete: "); } // I/MainActivity: onSubscribe: // I/MainActivity: onComplete: }); }Copy the code

SubscribeOn ()

/** * @author: yangtianfu * @date: {2019/4/11 17:47} * public Final Observable<T> subscribeOn(Scheduler Scheduler) * @description specifies the thread to be observed. If this method is called multiple times, Private void rxsubscribeOn() {Observable. Create (new ObservableOnSubscribe<Integer>() {@override public void  subscribe(ObservableEmitter<Integer> e) throws Exception { Log.i(TAG, "subscribe: =currentThread name: " + Thread.currentThread().getName()); E.onnext (1); e.onNext(2); e.onNext(3); e.onComplete(); } }).subscribeOn(Schedulers.newThread()) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe: "); } @Override public void onNext(Integer integer) { Log.i(TAG, "onNext: " + integer); } @Override public void onError(Throwable e) { Log.i(TAG, "onError: " + e); } @Override public void onComplete() { Log.i(TAG, "onComplete: "); }}); // I/MainActivity: onSubscribe: // I/MainActivity: subscribe: =currentThread name: RxNewThreadScheduler-1 // I/MainActivity: onNext: 1 // I/MainActivity: onNext: 2 // I/MainActivity: onNext: 3 // I/MainActivity: onComplete }Copy the code

ObserveOn ()

/** * @author: yangtianfu * @date: {2019/4/11 17:54} * public Final Observable<T> observeOn(Scheduler) * @description Specifies the thread on which the observer is based. */ private void rxobserveOn() { Observable.just(1, 2, 3) .observeOn(Schedulers.newThread()) .flatMap(new Function<Integer, ObservableSource<String>>() { @Override public ObservableSource<String> apply(Integer integer) throws Exception { Log.i(TAG, "======================flatMap Thread name " + Thread.currentThread().getName()); return Observable.just("chan" + integer); } }).observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe: "); } @Override public void onNext(String s) { Log.i(TAG, "onNext: Thread name " + Thread.currentThread().getName()); Log.i(TAG, "onNext: " + s); } @Override public void onError(Throwable e) { Log.i(TAG, "onError: "); } @Override public void onComplete() { Log.i(TAG, "onComplete: "); }}); // I/MainActivity: onSubscribe: // I/MainActivity: ======================flatMap Thread name RxNewThreadScheduler-1 // I/MainActivity: ======================flatMap Thread name RxNewThreadScheduler-1 // I/MainActivity: onNext: Thread name main // I/MainActivity: onNext: chan1 // I/MainActivity: onNext: Thread name main // I/MainActivity: onNext: chan2 // I/MainActivity: onNext: Thread name main // I/MainActivity: onNext: chan3 // I/MainActivity: onComplete: // Schedulers.computation( ) is used to use computing tasks, For example, event loops and callback processing // schedulers.immediate ( ) The current thread // schedulers.io ( ) is used for IO-intensive tasks if the IO operation is blocked asynchronously. / / Schedulers. NewThread ( ) to create a new thread / / AndroidSchedulers mainThread () of the Android UI thread, used to manipulate the UI. }Copy the code

Filter operator

The filter ()

/** * @author: Yangtianfu * @date: {2019/4/11 18:04} * public Final Observable<T> filter(Predicate<? Super T> Predicate) * @description Uses certain logic to filter events sent by the observer. Events will be sent if true, but not otherwise. */ private void rxfilter() { Observable.just(1, 2, 3) .filter(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { return integer < 2; } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe: "); } @Override public void onNext(Integer integer) { Log.i(TAG, "onNext: " + integer); } @Override public void onError(Throwable e) { Log.i(TAG, "onError: "); } @Override public void onComplete() { Log.i(TAG, "onComplete: "); }}); // I/MainActivity: onSubscribe: // I/MainActivity: onNext: 1 // I/MainActivity: onComplete: }Copy the code

OfType ()

/** * @author: yangtianfu * @date: {2019/4/11 18:08} * public final <U> Observable<U> ofType(Final Class<U> clazz) * @description filters events that do not comply with this type */ private void rxofType() { Observable.just(1, 2, 3, "chan", "zhide") .ofType(Integer.class) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe: "); } @Override public void onNext(Integer integer) { Log.i(TAG, "onNext: " + integer); } @Override public void onError(Throwable e) { Log.i(TAG, "onError: "); } @Override public void onComplete() { Log.i(TAG, "onComplete: "); }}); // I/MainActivity: onSubscribe: // I/MainActivity: onNext: 1 // I/MainActivity: onNext: 2 // I/MainActivity: onNext: 3 // I/MainActivity: onComplete: }Copy the code

The skip ()

/** * @author: yangtianfu * @date: {2019/4/11 18:11} public Final Observable<T> skip(long count) * @description Count represents the number of skipped events * skipLast() also skips events, Private void rxskip() {Observable. Just (1, 2, 1); 3).subscribe(new Observer<Integer>() {@override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe: "); } @Override public void onNext(Integer integer) { // i += integer; Log.i(TAG, "onNext: " + integer); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); // I/MainActivity: onSubscribe: // I/MainActivity: onNext: 3 }Copy the code

distinct()

/** * @author: Yangtianfu * @date: {2019/4/11 18:17} * Public Final Observable<T> Distinct () * @description Filters distinct events ina sequence of events. */ private void rxdistinct() { Observable.just(1, 2, 3, 3, 2, 1) .distinct() .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public  void onNext(Integer integer) { Log.i(TAG, "onNext: " + integer); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); // I/MainActivity: onNext: 1 // I/MainActivity: onNext: 2 // I/MainActivity: onNext: 3 }Copy the code

distinctUntilChanged()

/** * @author: yangtianfu * @date: {2019/4/11 18:20} * public Final Observable<T> distinctUntilChanged() * @description filters out repeated events */ private void rxdistinctUntilChanged() { Observable.just(1, 2, 3, 3, 2, 1) .distinctUntilChanged() .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer integer) { Log.i(TAG, "onNext: " + integer); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); // I/MainActivity: onNext: 1 // I/MainActivity: onNext: 2 // I/MainActivity: onNext: 3 // I/MainActivity: onNext: 2 // I/MainActivity: onNext: 1 }Copy the code

Take ()

/** * @author: yangtianfu * @date: {2019/4/11 18:23} * public Final Observable<T> take(long count) * @description Controls the number of events that an observer receives. */ private void rxtake() { Observable.just(1, 2, 3, 4, 5) .take(3) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer integer) { Log.i(TAG, "onNext: " + integer); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); // I/MainActivity: onNext: 1 // I/MainActivity: onNext: 2 // I/MainActivity: onNext: 3 }Copy the code

Debounce ()

/** * @author: yangtianfu * @date: {2019/4/11 18:26} * public final Observable<T> debounce(long timeout, TimeUnit unit) * @description If the interval between two events is less than the specified interval, the previous event will not be sent to the observer. * If multiple events are less than the set interval, only the last data will be sent, Private void rxdebounce() {ObservableOnSubscribe (new ObservableOnSubscribe<Integer>() {@override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); Thread.sleep(900); emitter.onNext(2); } }).debounce(1, TimeUnit.SECONDS) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer integer) { Log.i(TAG, "onNext: " + integer); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); // I/MainActivity: onNext: 2 }Copy the code

firstElement() & lastElement()

/** * @author: yangtianfu * @date: {2019/4/11 18:31} * public final Maybe<T> firstElement() * public final Maybe<T> lastElement() * @Description FirstElement () takes the firstElement of the event sequence, and lastElement() takes the lastElement of the event sequence. */ private void rxfirstElement() { Observable.just(1, 2, 3, 4) .firstElement() .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception {  Log.i(TAG, "accept: firstElement" + integer); }}); Observable.just(1, 2, 3, 4) .lastElement() .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.i(TAG, "accept: lastElement" + integer); }}); // I/MainActivity: accept: firstElement1 // I/MainActivity: accept: lastElement4 }Copy the code

elementAt() & elementAtOrError()

/** * @author: yangtianfu * @date: {2019/4/11 18:35} * public final Maybe<T> elementAt(long index) * public final Single<T> elementAtOrError(long index) * @description elementAt() can specify events that are retrieved from the sequence of events, but nothing happens if the input index exceeds the total number of events in the sequence. * In this case, you can use elementAtOrError() if you want to send an exception message. */ private void rxelementAt() { Observable.just(1, 2, 3, 4) .elementAt(0) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.i(TAG, "accept: " + integer); }}); // Observable.just(1, 2, 3, 4) // .elementAtOrError(5) // .subscribe(new Consumer<Integer>() { // @Override // public void accept(Integer integer) Throws NoSuchElementException // log. I (TAG, "Accept: "+ INTEGER); / / / /}}); }Copy the code

Conditional operator

All ()

/** * @author: Yangtianfu * @date: {2019/4/11 18:39} * public Final Observable<T> ambWith(ObservableSource<? Extends T> other) * @description Determines whether a sequence of events all satisfy an event, and returns true if so, or false if not. */ private void rxall() { Observable.just(1, 2, 3, 4) .all(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { return integer < 5;  } }).subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) throws Exception { Log.i(TAG, "accept: " + aBoolean); }}); }Copy the code

TakeWhile ()

/** * @author: Yangtianfu * @date: {2019/4/11 18:42} * public Final Observable<T> takeWhile(Predicate<? Super T> predicate) * @description can set conditions that will send data when the conditions are met and not when the conditions are met. */ private void rxtakeWhile() { Observable.just(1, 2, 3, 4) .takeWhile(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { return integer < 3; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.i(TAG, "accept: " + integer); }}); // I/MainActivity: accept: 1 // I/MainActivity: accept: 2 }Copy the code

TakeUntil ()

/** * @author: Yangtianfu * @date: {2019/4/11 18:45} * public Final Observable<T> takeUntil(Predicate<? Super T> stopPredicate * @description conditions can be set. When these conditions are met, the next event will not be sent. */ private void rxtakeUtil() { Observable.just(1, 2, 3, 4, 5, 6) .takeUntil(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { return integer > 3; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.i(TAG, "accept: " + integer); }}); // I/MainActivity: accept: 1 // I/MainActivity: accept: 2 // I/MainActivity: accept: 3 // I/MainActivity: accept: 4 }Copy the code

SkipWhile ()

/** * @author: Yangtianfu * @date: {2019/4/12 9:40} * public Final Observable<T> skipWhile(Predicate<? Super T> predicate * @description As opposed to takeWhile, the data is not sent when it meets the condition and is sent otherwise. */ private void rxskipWhile() { Observable.just(1, 2, 3, 4) .skipWhile(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { return integer < 3; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.i(TAG, "accept: " + integer); }}); // I/MainActivity: accept: 3 // I/MainActivity: accept: 4 }Copy the code

SkipUntil ()

/** * @author: yangtianfu * @date: {2019/4/12 9:46} * @description When an Observable in skipUntil() sends an event, the original Observable sends an event to the observer. * The Observable in skipUntil() does not send events to observers. */ private void rxskipUtil() { Observable.intervalRange(1, 5, 0, 1, Timeunit.seconds). SkipUntil (Observable.intervalRange(6, 5, 3, 1, timeUnit.seconds)) Subscribe (new Consumer<Long>() {@override public void Accept (Long aLong) throws Exception {log. I (TAG, "accept: "); }}); }Copy the code

SequenceEqual ()

/** * @author: Yangtianfu * @date: {2019/4/12 10:02} * public static <T> Single<Boolean> sequenceEqual(ObservableSource<? extends T> source1, ObservableSource<? Extends T> source2) * @description determines whether two Observables send the same event. */ private void rxsequenceEqual() { Observable.sequenceEqual(Observable.just(1, 2, 3), Observable.just(1, 2, 3)) .subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) throws Exception { Log.i(TAG, "accept: " + aBoolean); } // I/MainActivity: accept: true }); }Copy the code

The contains ()

/** * @author: yangtianfu * @date: {2019/4/12 10:05} * public final Single<Boolean> Contains (final Object element) Returns true if there is, false if there is not. */ private void rxcontains() { Observable.just(1, 2, 3) .contains(3) .subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) throws Exception { Log.i(TAG, "accept: " + aBoolean); }}); }Copy the code

isEmpty()

/** * @author: yangtianfu * @date: {2019/4/12 10:07} * public final Single<Boolean> isEmpty() * @description specifies whether the event sequence isEmpty. */ private void rxisEmpty() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) { emitter.onComplete(); } }).isEmpty() .subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) throws Exception { Log.i(TAG, "accept: " + aBoolean); // I/MainActivity: accept: true } }); }Copy the code

/** * @author: Yangtianfu * @date: {2019/4/12 10:11} * public static <T> Observable<T> amb(Iterable<? extends ObservableSource<? Extends T>> sources) * @description amb() extends T>> sources) * @description amb() extends T>> sources) * @description amb() extends T>> sources) * @description amb() extends T>> sources) * @description amb() extends T>> sources) * @description amb() extends T>> sources) * @description amb() No event is sent. */ private void rxamb() { ArrayList<Observable<Long>> list = new ArrayList<>(); list.add(Observable.intervalRange(1, 5, 2, 1, TimeUnit.SECONDS)); list.add(Observable.intervalRange(6, 5, 0, 1, TimeUnit.SECONDS)); Observable.amb(list) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.i(TAG, "accept: " + aLong); }}); // I/MainActivity: accept: 6 // I/MainActivity: accept: 7 // I/MainActivity: accept: 8 // I/MainActivity: accept: 9 // I/MainActivity: accept: 10 }Copy the code

DefaultIfEmpty ()

/** * @author: yangtianfu * @date: {2019/4/12 10:15} * Public Final Observable<T> defaultIfEmpty(T defaultItem) * @description Private void rxdefaultEmpty() {Observable. Create (new ObservableOnSubscribe<Integer>() {@override public void subscribe(ObservableEmitter<Integer> emitter) { emitter.onComplete(); } }).defaultIfEmpty(666) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.i(TAG, "accept: " + integer); }}); }Copy the code

Examples of Rxjava2 operators:

Create

The Observable is called an emitter (upstream event), and the Observer is called a receiver (downstream event).

Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception { mRxOperatorsText.append("Observable emit 1" + "\n"); Log.e(TAG, "Observable emit 1" + "\n"); e.onNext(1); mRxOperatorsText.append("Observable emit 2" + "\n"); Log.e(TAG, "Observable emit 2" + "\n"); e.onNext(2); mRxOperatorsText.append("Observable emit 3" + "\n"); Log.e(TAG, "Observable emit 3" + "\n"); e.onNext(3); e.onComplete(); mRxOperatorsText.append("Observable emit 4" + "\n"); Log.e(TAG, "Observable emit 4" + "\n" ); e.onNext(4); } }).subscribe(new Observer<Integer>() { private int i; private Disposable mDisposable; @Override public void onSubscribe(@NonNull Disposable d) { mRxOperatorsText.append("onSubscribe : " + d.isDisposed() + "\n"); Log.e(TAG, "onSubscribe : " + d.isDisposed() + "\n" ); mDisposable = d; } @Override public void onNext(@NonNull Integer integer) { mRxOperatorsText.append("onNext : value : " + integer + "\n"); Log.e(TAG, "onNext : value : " + integer + "\n" ); i++; Dispose (); dispose(); dispose(); dispose(); dispose(); dispose(); dispose(); dispose() mRxOperatorsText.append("onNext : isDisposable : " + mDisposable.isDisposed() + "\n"); Log.e(TAG, "onNext : isDisposable : " + mDisposable.isDisposed() + "\n"); } } @Override public void onError(@NonNull Throwable e) { mRxOperatorsText.append("onError : value : " + e.getMessage() + "\n"); Log.e(TAG, "onError : value : " + e.getMessage() + "\n" ); } @Override public void onComplete() { mRxOperatorsText.append("onComplete" + "\n"); Log.e(TAG, "onComplete" + "\n" ); }});Copy the code

just

RxJava 2.x adds a custom implementation for Consumer. Accept is the equivalent of onNext, which is a simple emitter that calls onNext() in turn.

Observable.just("1", "2", "3") .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<String>() { @Override public void accept(@NonNull String s) throws Exception { mRxOperatorsText.append("accept : onNext : " + s + "\n"); Log.e(TAG,"accept : onNext : " + s + "\n" ); }});Copy the code

doOnNext

Operator that tells the subscriber to do something before receiving the data, such as we want to save the data before we get it

Observable.just(1, 2, 3, 4) .doOnNext(new Consumer<Integer>() { @Override public void accept(@NonNull Integer integer) throws Exception { MRxOperatorsText. Append ("doOnNext save "+ INTEGER +" success "+ "\n"); Log.e(TAG, "doOnNext save "+ integer +" success "+ "\n"); } }).subscribe(new Consumer<Integer>() { @Override public void accept(@NonNull Integer integer) throws Exception { mRxOperatorsText.append("doOnNext :" + integer + "\n"); Log.e(TAG, "doOnNext :" + integer + "\n"); }});Copy the code

Other do class operators:

Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) { emitter.onNext(2); emitter.onNext(3); Emitters. OnError (New Throwable(" error ")); DoOnEach (new Consumer<Notification<Integer>>() {@override public void Override public void accept(Notification<Integer> integerNotification) throws Exception { Log.d(TAG, "doOnEach: " + integerNotification.getValue()); Call.doonNext (new Consumer<Integer>() {@override public void accept(Integer Integer) throws Exception { Log.d(TAG, "doOnNext: " + integer); DoAfterNext (new Consumer<Integer>() {@override public void accept(Integer Integer) throws Exception { Log.d(TAG, "doAfterNext: " + integer); }}) // 4. Observable calls.doonComplete (new Action() {@override public void run() throws Exception {log.e (TAG, TAG) "doOnComplete: "); // Subscribe(new Consumer<Disposable>() {@override public void Accept (Disposable); throws Exception { Log.e(TAG, "doOnSubscribe: "); }}) // 7. Observable sends events. DoAfterTerminate (new Action() {@override public void run() throws Exception {log.e (TAG, "doAfterTerminate: "); .dofinally (new Action() {@override public void run() throws Exception {log.e (TAG, "doFinally: "); } }) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void OnNext (Integer Integer) {log. d(TAG, "event received "+ Integer); } @override public void onError(Throwable e) {log.d (TAG, "response to Error "); } @override public void onComplete() {log.d (TAG, "response to Complete event "); }});Copy the code

filter

Filter operator, take the correct value, filter out the value that does not meet our condition, accept only the value that meets the filter condition to send downstream events

 Observable.just(1, 20, 65, -5, 7, 19)
                .filter(new Predicate<Integer>() {
                    @Override
                    public boolean test(@NonNull Integer integer) throws Exception {
                        return integer >= 10;
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
                mRxOperatorsText.append("filter : " + integer + "\n");
                Log.e(TAG, "filter : " + integer + "\n");
            }
        });
Copy the code

skip

Accepts a long parameter that skips count to start receiving.

Observable. Just (1,2,3,4,5).skip(2). Subscribe (new Consumer<Integer>() {@override public void accept(@nonnull Integer) integer) throws Exception { mRxOperatorsText.append("skip : "+integer + "\n"); Log.e(TAG, "skip : "+integer + "\n"); }});Copy the code

Flowable

Backpressure, a strategy used to tell the upstream observer to slow down the sending speed in asynchronous scenarios where the observed sends events much faster than the observer can process them

Flowable. Just (1,2,3,4). Reduce (100, new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception { return integer+integer2; } }).subscribe(new Consumer<Integer>() { @Override public void accept(@NonNull Integer integer) throws Exception { mRxOperatorsText.append("Flowable :"+integer+"\n"); Log.e(TAG, "Flowable :"+integer+"\n" ); }});Copy the code

take

Used to specify how much data a subscriber can receive at most

Flowable.fromarray (1,2,3,4,5).take(2).subscribe(new Consumer<Integer>() {@override public void accept(@nonnull) Integer integer) throws Exception { mRxOperatorsText.append("take : "+integer + "\n"); Log.e(TAG, "accept: take : "+integer + "\n" ); }});Copy the code

Word-wrap: break-word! Important; “>

final int count =11; Observables. The interval (0, 1, TimeUnit. SECONDS.) take (count). The map (new Function < Long, Long>() { @Override public Long apply(Long aLong) throws Exception { return count - aLong; } }) .observeOn(AndroidSchedulers.mainThread()) .doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { mButtonSend.setEnabled(false); mButtonSend.setTextColor(Color.BLACK); } }) .subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { } @Override public void OnNext (Long aLong) {mButtonSend. SetText (" rest "+ aLong +" second "); } @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onComplete() { mButtonSend.setEnabled(true); mButtonSend.setTextColor(Color.WHITE); MButtonSend. SetText (" Verification code "); }});Copy the code

distinct

The de-redo operator is simply de-redo

Observable.just(1, 1, 1, 2, 2, 3, 4, 5) .distinct() .subscribe(new Consumer<Integer>() { @Override public void accept(@NonNull Integer integer) throws Exception { mRxOperatorsText.append("distinct : " + integer + "\n"); Log.e(TAG, "distinct : " + integer + "\n"); }});Copy the code

single

As the name implies, Single only accepts one argument, while SingleObserver only calls onError or onSuccess

Single.just(new Random().nextInt()) .subscribe(new SingleObserver<Integer>() { @Override public void onSubscribe(@NonNull Disposable d) { } @Override public void onSuccess(@NonNull Integer integer) { mRxOperatorsText.append("single : onSuccess : "+integer+"\n"); Log.e(TAG, "single : onSuccess : "+integer+"\n" ); } @Override public void onError(@NonNull Throwable e) { mRxOperatorsText.append("single : onError : "+e.getMessage()+"\n"); Log.e(TAG, "single : onError : "+e.getMessage()+"\n"); }});Copy the code

concat

The connection operator, which accepts the variable parameters of an Observable, or the collection of Observables, takes advantage of concat’s feature that it must call onComplete before subscribing to the next Observable. If the cached data is not what we want, we call onComplete() to execute the Observable that gets network data. If the cached data meets our needs, we call onNext() directly to prevent excessive network requests and waste of user traffic.

Observables. Concat (observables. Just (1, 2, 3), Observable. Just (4,5,6)). Subscribe (new Consumer<Integer>() {@override public void accept(@nonnull Integer) throws Exception { mRxOperatorsText.append("concat : "+ integer + "\n"); Log.e(TAG, "concat : "+ integer + "\n" ); }});Copy the code

Example: Get cache data from disk/memory cache

Public class UsageDemo1 extends appactivity {// this 2 variable is used to simulate memoryCache & data in disk cache String memoryCache = null; String diskCache = "get data from diskCache "; private String TAG = "RxJava"; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_combine); /* * Set first Observable: Observable<String> memory = ObservableOnSubscribe (new ObservableOnSubscribe<String>() {@override public Void SUBSCRIBE (ObservableEmitter<String> Emitter) throws Exception { = null) {emitters. OnNext (memoryCache); } else {emitters. OnComplete (); emitters. }}}); /* * Set the second Observable: Observable<String> disk = Observable. Create (new ObservableOnSubscribe<String>() {@override public Void subscribe(ObservableEmitter<String> Emitter) throws Exception { = null) {emitters. OnNext (diskCache); } else {emitters. OnComplete (); emitters. }}}); Observable<String> network = observable. just(" get data from network "); /* * Caching via concat () and firstElement () operators **/ / 1. Concat () merges memory, disk, and network events // and queues them sequentially. network) // 2. With firstElement(), the first valid event (the Next event) is fetched from the concatenated queue, which is checked by memory, disk, and network.firstElement (). // a.firstElement () = memory; MemoryCache = null; // firstElement(); // firstElement(); Because diskCache ≠ null, that is, there is data in the diskCache, so send Next event (valid event) // c. That is, firstElement() has emitted its first valid event (disk event), so it stops judging. Subscribe (new Consumer<String>() {@override public void accept(String s) throws Exception { Log.d(TAG," final data source = "+ s); }}); }}Copy the code

The merge/mergeArray ()

Combine multiple Observables, accept variable parameters, and support iterator collections. Note that it differs from concat in that it does not wait for emitter A to send all events before sending emitter B. Difference: the number of observed combined, that is, the merge () combination number 4 or less, observed and mergeArray () can be > 4 the difference between the concat () operator, is also a combination of multiple observed together to send data, but the concat () operation is fit and then send order serial execution

Observable.merge(Observable.just(1, 2), Observable.just(3, 4, 5)) .subscribe(new Consumer<Integer>() { @Override public void accept(@NonNull Integer integer) throws Exception { mRxOperatorsText.append("merge :" + integer + "\n"); Log.e(TAG, "accept: merge :" + integer + "\n" ); }});Copy the code

Merge data source

public class UsageDemo2 extends AppCompatActivity { private String TAG = "RxJava"; Private String result = "Data source from ="; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_combine); Observable<String> network = observable. just(" network "); Observable<String> file = observable. just(" local file "); /* * Merge events by merge() **/ Observable. Merge (network, file) .subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { } @Override public void OnNext (String value) {log. d(TAG, "data source has: "+ value); result += value + "+"; } @override public void onError(Throwable e) {log.d (TAG, "response to Error "); } @override public void onComplete() {log.d (TAG, "data completed "); Log.d(TAG, result ); }}); }}Copy the code

Merge local and network data

public void click(View view) { Observable.merge(getDatasFromLocal(),getDatasFromNetWork()) .subscribe(new Consumer<List<Course>>() {@override public void accept(List<Course> courses) throws Exception {// Merge data courses}}); Private Observable<List<Course>> getDatasFromLocal(){List<Course> List = new ArrayList<>();  List.add (new Course(" cainiao mall ")); List.add (new Course(" Rookie news ")); return Observable.just(list); } / / private Observable<List<Course>> getDatasFromNetWork(){return api.getCourse().subscribeOn(Schedulers.io()); // List<Course> list = new ArrayList<>(); // list.add(new Course(" rookie live ")); // list.add(new Course(" rookie mobile Assistant ")); // return Observable.just(list); }Copy the code

buffer

Buffer (count, skip) ‘can be almost seen from the definition. Divide the data in an Observable by skip (step) into buffers no longer than count, and then generate an Observable

Observable. Just (1, 2, 3, 4, 5). Buffer (3, 2) Subscribe (new Consumer<List<Integer>>() {@override public void accept(@nonnull List<Integer> integers) throws Exception { mRxOperatorsText.append("buffer size : " + integers.size() + "\n"); mRxOperatorsText.append("buffer value : "); for (Integer i : integers) { mRxOperatorsText.append(i + ""); } mRxOperatorsText.append("\n"); Log.e(TAG, "\n"); }});Copy the code

We emit 1, 2, 3, 4, and 5 successively through the buffer operator, where skip is 2, count is 3, and our output is 123, 345, and 5. Obviously, the first parameter of our buffer is count, which represents the maximum value. When there are enough events, we usually take count and skip skip events at a time.

map

Basically, the simplest RxJava operator is to apply a function to each event sent upstream, making each event change according to the specified function for network data parsing

Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onNext(3); }}). Map (new Function<Integer, String>() {// Put the upstream event into the Function Function, accept two parameters, Integer is the accepted event type, String Is the return type. @Override Public String apply(@nonNULL Integer Integer) throws Exception {return "This is result "+ integer; }}).subscribe(new Consumer<String>() {// RxJava 2.x new Consumer, custom implementation, OnNext @override public void Accept (@nonnull String s) throws Exception {mRxOperatorsText. Append ("accept  : " + s +"\n"); Log.e(TAG, "accept : " + s +"\n" ); }});Copy the code

The basic function of a map is to convert an Observable into another Observable through some kind of functional relationship. ###FlatMap Converts an upstream Observables sending events into Observables sending events. Then combine their emission times into a single Observable, enabling multiple network requests to rely on one by one

 Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
            }
        }).flatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
                List<String> list = new ArrayList<>();
                for (int i = 0; i < 3; i++) {
                    list.add("I am value " + integer);
                }
                int delayTime = (int) (1 + Math.random() * 10);
                return Observable.fromIterable(list).delay(delayTime, TimeUnit.MILLISECONDS);
            }
        }).subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(@NonNull String s) throws Exception {
                        Log.e(TAG, "flatMap : accept : " + s + "\n");
                        mRxOperatorsText.append("flatMap : accept : " + s + "\n");
                    }
                });
Copy the code

It can somehow convert an Observables into multiple Observables, and then pack the scattered Observables into a single Observables. Note that flatMap does not guarantee the order of events. To do so, use ConcatMap ### ConcatMap. The only difference is that it guarantees the order of events, replacing flatMap directly with concatMap validation

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
            }
        }).concatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
                List<String> list = new ArrayList<>();
                for (int i = 0; i < 3; i++) {
                    list.add("I am value " + integer);
                }
                int delayTime = (int) (1 + Math.random() * 10);
                return Observable.fromIterable(list).delay(delayTime, TimeUnit.MILLISECONDS);
            }
        }).subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(@NonNull String s) throws Exception {
                        Log.e(TAG, "concatMap : accept : " + s + "\n");
                        mRxOperatorsText.append("concatMap : accept : " + s + "\n");
                    }
                });
Copy the code

Observable. Create (new ObservableOnSubscribe<Integer>() {@override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); } // use concatMap() operator}). ConcatMap (new Function<Integer, ObservableSource<String>>() { @Override public ObservableSource<String> apply(Integer integer) throws Exception { final List<String> list = new ArrayList<>(); for (int i = 0; i < 3; I ++) {list.add(" I am an event "+ integer +" split child event "+ I); } return Observable. FromIterable (list);} Return Observable.fromIterable(list);} Return Observable. } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, s); }});Copy the code

ConcatDelayError ()/mergeDelayError ()

Usage scenario: Merge (). If one Of the Observables emits an onError event, the other observables are skipped. We want to start onError when other Observables send events. When used:

/** * @describe how you use concatDelayError () */ private void initErrorDelay() { Observable.concatArrayDelayError(Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onError(new NullPointerException()); // Send an Error. Since concatDelayError is used, the second Observable sends an event, and when it's done, it sends the Error emitters. OnComplete (). }})); Observable. Just (4,5,6). Subscribe(new Observer<Integer>() {@override public void onSubscribe(Disposable d) {} @override public void onNext(Integer Integer) {log. d(TAG, "received event "+ Integer); } @override public void onError(Throwable e) {log.d (TAG, "response to Error "); } @override public void onComplete() {log.d (TAG, "response to Complete event "); }}); }Copy the code

switchMap

Only the latest data is sent. The data that has not been returned before is discarded for the search function, avoiding the problem of old data being returned and new data being overwritten

RxTextView.textChanges(this.mEditText) .debounce(200,TimeUnit.MILLISECONDS) .subscribeOn(AndroidSchedulers.mainThread())  .filter(new AppendOnlyLinkedArrayList.NonThrowingPredicate<CharSequence>() { @Override public boolean test(CharSequence CharSequence) {// Filter data return charSequence.toString().trim().length()>0; } }) .switchMap(new Function<CharSequence, ObservableSource<? >>() { @Override public ObservableSource<? > apply(CharSequence CharSequence) throws Exception {// Search Searches for only the latest input within 200 milliseconds. List<String> List = new ArrayList<String>(); list.add("abc"); list.add("ada"); Return Observable.just(list); } }) // .flatMap(new Function<CharSequence, ObservableSource<? >>() { // @Override // public ObservableSource<? > apply(CharSequence charSequence) throws Exception { // // search // List<String> list = new ArrayList<String>(); // list.add("abc"); // list.add("ada"); // return Observable.just(list); // } // }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Object>() { @override public void accept(Object O) throws Exception {// Display search results}}, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { } });Copy the code

delay

* @author: yangtianfu * @date: {2018/12/14 20:01} * @ Description delay 800 ms, used to wait for the asynchronous operation to complete * / observables. Just (" delay "). The delay (800, TimeUnit. MILLISECONDS) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(String s) { } @Override public void OnError (Throwable e) {} @override public void onComplete() {// Set the listView to slide to the bottom mListViewMsgItems.setSelection(mListViewMsgItems.getBottom()); }});Copy the code

ThrottleFirst Click to prevent jitter

* @author: yangtianfu * @date: {2018/12/14 20:01} * @description control anti-jitter, windowDuration only allowed to click once * can be used for many times to initialize the click event, set the anti-jitter time of the control under different conditions * When the discussion area is open, only one click within 1s, When the forum is closed, Only click once within 5 s * / RxView) on (bar_btn_text_live). ThrottleFirst (windowDuration, TimeUnit. SECONDS). The subscribe (new Consumer<Object>() { @Override public void accept(Object o) throws Exception { if (SPUtils.ReadInt(LiveActivity.this, "Message ", "message") == 100) {prelivetoast.getInstance ().initSingLePreliveToast (liveactivity.this," Administrator closed the discussion section "); }else { if (fl_emotionview_main.getVisibility()==View.GONE){ fl_emotionview_main.setVisibility(View.VISIBLE); } inputMsgDialog(); }}});Copy the code

The timer (obsolete)

In Rxjava, the timer operator can delay or interval execution of a piece of logic. However, in Rxjava 2.x, the interval operator is now used to interval execution. Both timer and interval are executed on a new thread by default

mRxOperatorsText.append("timer start : " + TimeUtil.getNowStrTime() + "\n"); Log.e(TAG, "timer start : " + TimeUtil.getNowStrTime() + "\n"); Observable.timer(2, TimeUnit.SECONDS) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Long>() { @Override public void accept(@NonNull Long aLong) throws Exception { mRxOperatorsText.append("timer :" + aLong + " at " + TimeUtil.getNowStrTime() + "\n"); Log.e(TAG, "timer :" + aLong + " at " + TimeUtil.getNowStrTime() + "\n"); }});Copy the code

interval

Interval execution operation, default in the new thread, to achieve the heartbeat interval task, which accepts three parameters, respectively is the first send delay, interval time, time unit.

mRxOperatorsText.append("interval start : " + TimeUtil.getNowStrTime() + "\n"); MDisposable = observable. interval(3, 2, timeunit.seconds) Interval event 2 s. subscribeOn (Schedulers. IO ()). ObserveOn (AndroidSchedulers. MainThread ()) / / as a result of the interval by default in a new thread, Subscribe (new Consumer<Long>() {@override public void Accept (@nonnull Long aLong) throws Exception {subscribe(new Consumer<Long>() {@override public void Accept (@nonnull Long aLong) throws Exception { mRxOperatorsText.append("interval :" + aLong + " at " + TimeUtil.getNowStrTime() + "\n"); Log.e(TAG, "interval :" + aLong + " at " + TimeUtil.getNowStrTime() + "\n"); }});Copy the code
Override protected void onDestroy() {super.ondestroy (); Override protected void onDestroy() {super.ondestroy (); mDisposable.dispose(); }Copy the code

* @author: yangtianfu * @date: {2018/12/14 20:01} * @description delay 0s, request every 10s */ Disposable =Observable. Interval (0,10, TimeUnit.SECONDS) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { presenter.getTecherInRoom(token,roomId); }});Copy the code

defer

A new Observable is created each time a subscription is made

Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<Integer>>() { @Override public ObservableSource<Integer> call() throws Exception { return Observable.just(1, 2, 3); }}); observable.subscribe(new Observer<Integer>() { @Override public void onSubscribe(@NonNull Disposable d) { } @Override public void onNext(@NonNull Integer integer) { mRxOperatorsText.append("defer : " + integer + "\n"); Log.e(TAG, "defer : " + integer + "\n"); } @Override public void onError(@NonNull Throwable e) { mRxOperatorsText.append("defer : onError : " + e.getMessage() + "\n"); Log.e(TAG, "defer : onError : " + e.getMessage() + "\n"); } @Override public void onComplete() { mRxOperatorsText.append("defer : onComplete\n"); Log.e(TAG, "defer : onComplete\n"); }});Copy the code

reduce

You process a value one method at a time, and you can have a seed as an initial value

Observable. Just (1, 2, 3).reduce(new BiFunction<Integer, Integer, Integer>() {// The first two integers are parameters, @override public Integer apply(@nonnull Integer Integer, @NonNull Integer integer2) throws Exception { return integer + integer2; } }).subscribe(new Consumer<Integer>() { @Override public void accept(@NonNull Integer integer) throws Exception { mRxOperatorsText.append("reduce : " + integer + "\n"); Log.e(TAG, "accept: reduce : " + integer + "\n"); }});Copy the code

Reduce is adopted in the middle, and a function can be used to add two values, so the final value should be: 1 + 2 = 3 + 3 = 6

scan

Similar to reduce, reduce() outputs only results, while Scan () outputs every result in the process

Observable.just(1, 2, 3) .scan(new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception { return integer + integer2; } }).subscribe(new Consumer<Integer>() { @Override public void accept(@NonNull Integer integer) throws Exception { mRxOperatorsText.append("scan " + integer + "\n"); Log.e(TAG, "accept: scan " + integer + "\n"); }});Copy the code

debounce

Filter out data items that emit too fast

Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception { // send events with simulated time wait emitter.onNext(1); // skip Thread.sleep(400); emitter.onNext(2); // deliver Thread.sleep(505); emitter.onNext(3); // skip Thread.sleep(100); emitter.onNext(4); // deliver Thread.sleep(605); emitter.onNext(5); // deliver Thread.sleep(510); emitter.onComplete(); } }).debounce(500, TimeUnit. MILLISECONDS) / / filter out emission of no more than 500 ms. SubscribeOn (Schedulers. IO ()). ObserveOn (AndroidSchedulers. MainThread ()) .subscribe(new Consumer<Integer>() { @Override public void accept(@NonNull Integer integer) throws Exception { mRxOperatorsText.append("debounce :" + integer + "\n"); Log.e(TAG,"debounce :" + integer + "\n"); }});Copy the code

Get rid of events with send intervals less than 500 ms, so 1 and 3 are removed

  Observable.just(1, 2, 3)
                .last(4)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        mRxOperatorsText.append("last : " + integer + "\n");
                        Log.e(TAG, "last : " + integer + "\n");
                    }
                });
Copy the code

window

Divide Windows by time and send data to different Observables

Observable.interval(1, timeunit.seconds) // send once every second. Take (15) // Accept up to 15. TimeUnit.SECONDS) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Observable<Long>>() { @Override public void accept(@NonNull Observable<Long> longObservable) throws Exception {  mRxOperatorsText.append("Sub Divide begin... \n"); Log.e(TAG, "Sub Divide begin... \n"); longObservable.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Long>() {  @Override public void accept(@NonNull Long aLong) throws Exception { mRxOperatorsText.append("Next:" + aLong + "\n"); Log.e(TAG, "Next:" + aLong + "\n"); }}); }});Copy the code

publishSubject

OnNext () notifies each observer, and that’s it

mRxOperatorsText.append("PublishSubject\n");
        Log.e(TAG, "PublishSubject\n");

        PublishSubject<Integer> publishSubject = PublishSubject.create();

        publishSubject.subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                mRxOperatorsText.append("First onSubscribe :"+d.isDisposed()+"\n");
                            }

            @Override
            public void onNext(@NonNull Integer integer) {
                mRxOperatorsText.append("First onNext value :"+integer + "\n");

            }

            @Override
            public void onError(@NonNull Throwable e) {
                mRxOperatorsText.append("First onError:"+e.getMessage()+"\n");
                          }

            @Override
            public void onComplete() {
                mRxOperatorsText.append("First onComplete!\n");
            
            }
        });

        publishSubject.onNext(1);
        publishSubject.onNext(2);
        publishSubject.onNext(3);

        publishSubject.subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                mRxOperatorsText.append("Second onSubscribe :"+d.isDisposed()+"\n");
                            }

            @Override
            public void onNext(@NonNull Integer integer) {
                mRxOperatorsText.append("Second onNext value :"+integer + "\n");

            }

            @Override
            public void onError(@NonNull Throwable e) {
                mRxOperatorsText.append("Second onError:"+e.getMessage()+"\n");
                           }

            @Override
            public void onComplete() {
                mRxOperatorsText.append("Second onComplete!\n");
                           }
        });

        publishSubject.onNext(4);
        publishSubject.onNext(5);
        publishSubject.onComplete();
    }
Copy the code

asyncSubject

All operations other than SUBSCRIBE () are cached until onComplete() is called, and only the last onNext() is in effect after onComplete() is called

AsyncSubject<Integer> asyncSubject = AsyncSubject.create(); asyncSubject.subscribe(new Observer<Integer>() { @Override public void onSubscribe(@NonNull Disposable d) { mRxOperatorsText.append("First onSubscribe :"+d.isDisposed()+"\n"); Log.e(TAG, "First onSubscribe :"+d.isDisposed()+"\n"); } @Override public void onNext(@NonNull Integer integer) { mRxOperatorsText.append("First onNext value :"+integer + "\n"); Log.e(TAG, "First onNext value :"+integer + "\n"); } @Override public void onError(@NonNull Throwable e) { mRxOperatorsText.append("First onError:"+e.getMessage()+"\n"); Log.e(TAG, "First onError:"+e.getMessage()+"\n" ); } @override public void onComplete() {// Call onComplete before onNext OnNext mRxOperatorsText. Append ("First onComplete! \n"); Log.e(TAG, "First onComplete! \n"); }}); asyncSubject.onNext(1); asyncSubject.onNext(2); asyncSubject.onNext(3); asyncSubject.subscribe(new Observer<Integer>() { @Override public void onSubscribe(@NonNull Disposable d) { mRxOperatorsText.append("Second onSubscribe :"+d.isDisposed()+"\n"); Log.e(TAG, "Second onSubscribe :"+d.isDisposed()+"\n"); } @Override public void onNext(@NonNull Integer integer) { mRxOperatorsText.append("Second onNext value :"+integer + "\n"); Log.e(TAG, "Second onNext value :"+integer + "\n"); } @Override public void onError(@NonNull Throwable e) { mRxOperatorsText.append("Second onError:"+e.getMessage()+"\n"); Log.e(TAG, "Second onError:"+e.getMessage()+"\n" ); } @Override public void onComplete() { mRxOperatorsText.append("Second onComplete! \n"); Log.e(TAG, "Second onComplete! \n"); }}); asyncSubject.onNext(4); asyncSubject.onNext(5); asyncSubject.onComplete(); }Copy the code

BehaviorSubject

The BehaviorSubject’s last onNext() operation is cached, and then immediately pushed to the newly registered Observer after subscribe()

BehaviorSubject<Integer> behaviorSubject = BehaviorSubject.create(); behaviorSubject.subscribe(new Observer<Integer>() { @Override public void onSubscribe(@NonNull Disposable d) { mRxOperatorsText.append("First onSubscribe :"+d.isDisposed()+"\n"); Log.e(TAG, "First onSubscribe :"+d.isDisposed()+"\n"); } @Override public void onNext(@NonNull Integer integer) { mRxOperatorsText.append("First onNext value :"+integer + "\n"); Log.e(TAG, "First onNext value :"+integer + "\n"); } @Override public void onError(@NonNull Throwable e) { mRxOperatorsText.append("First onError:"+e.getMessage()+"\n"); Log.e(TAG, "First onError:"+e.getMessage()+"\n" ); } @Override public void onComplete() { mRxOperatorsText.append("First onComplete! \n"); Log.e(TAG, "First onComplete! \n"); }}); behaviorSubject.onNext(1); behaviorSubject.onNext(2); behaviorSubject.onNext(3); behaviorSubject.subscribe(new Observer<Integer>() { @Override public void onSubscribe(@NonNull Disposable d) { mRxOperatorsText.append("Second onSubscribe :"+d.isDisposed()+"\n"); Log.e(TAG, "Second onSubscribe :"+d.isDisposed()+"\n"); } @Override public void onNext(@NonNull Integer integer) { mRxOperatorsText.append("Second onNext value :"+integer + "\n"); Log.e(TAG, "Second onNext value :"+integer + "\n"); } @Override public void onError(@NonNull Throwable e) { mRxOperatorsText.append("Second onError:"+e.getMessage()+"\n"); Log.e(TAG, "Second onError:"+e.getMessage()+"\n" ); } @Override public void onComplete() { mRxOperatorsText.append("Second onComplete! \n"); Log.e(TAG, "Second onComplete! \n"); }}); behaviorSubject.onNext(4); behaviorSubject.onNext(5); behaviorSubject.onComplete();Copy the code

Completable

It only cares about the result, that is, the Completable has no onNext, either succeeds or fails, doesn’t care about the process, returns the result at some point after subscribe

Completable.timer(1, TimeUnit.SECONDS) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new CompletableObserver() { @Override public void onSubscribe(@NonNull Disposable d) { mRxOperatorsText.append("onSubscribe : d :" + d.isDisposed() + "\n"); Log.e(TAG, "onSubscribe : d :" + d.isDisposed() + "\n"); } @Override public void onComplete() { mRxOperatorsText.append("onComplete\n"); Log.e(TAG, "onComplete\n"); } @Override public void onError(@NonNull Throwable e) { mRxOperatorsText.append("onError :" + e.getMessage() + "\n"); Log.e(TAG, "onError :" + e.getMessage() + "\n"); }});Copy the code

zip

Merging is special, respectively from two upstream events took out a combination, in an event can only be used once, order strictly according to send the order of events, finally received the upstream and downstream events events at least the same number, must be two pairs, redundant Implement multiple interface data for updating the UI in actual application, It is very likely that the data displayed on a page comes from multiple interfaces. In this case, our ZIP operator can combine data from multiple Observables into a single data source and then transmit it.

@Override protected void doSomething() { Observable.zip(getStringObservable(), getIntegerObservable(), new BiFunction<String, Integer, String>() { @Override public String apply(@NonNull String s, @NonNull Integer integer) throws Exception { return s + integer; } }).subscribe(new Consumer<String>() { @Override public void accept(@NonNull String s) throws Exception { mRxOperatorsText.append("zip : accept : " + s + "\n"); Log.e(TAG, "zip : accept : " + s + "\n"); }}); } private Observable<String> getStringObservable() { return Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception { if (! e.isDisposed()) { e.onNext("A"); mRxOperatorsText.append("String emit : A \n"); Log.e(TAG, "String emit : A \n"); e.onNext("B"); mRxOperatorsText.append("String emit : B \n"); Log.e(TAG, "String emit : B \n"); e.onNext("C"); mRxOperatorsText.append("String emit : C \n"); Log.e(TAG, "String emit : C \n"); }}}); } private Observable<Integer> getIntegerObservable() { return Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception { if (! e.isDisposed()) { e.onNext(1); mRxOperatorsText.append("Integer emit : 1 \n"); Log.e(TAG, "Integer emit : 1 \n"); e.onNext(2); mRxOperatorsText.append("Integer emit : 2 \n"); Log.e(TAG, "Integer emit : 2 \n"); e.onNext(3); mRxOperatorsText.append("Integer emit : 3 \n"); Log.e(TAG, "Integer emit : 3 \n"); e.onNext(4); mRxOperatorsText.append("Integer emit : 4 \n"); Log.e(TAG, "Integer emit : 4 \n"); e.onNext(5); mRxOperatorsText.append("Integer emit : 5 \n"); Log.e(TAG, "Integer emit : 5 \n"); }}});Copy the code

The process of zip event combination is to take out an event from emitter A and emitter B respectively to combine, and one event can only be used once. The combination sequence is carried out strictly in accordance with the order in which the event is sent. Therefore, it can be seen from the above screenshot that it is always combined with A and B. Finally, the number of events received by the receiver is the same as that sent by the sender with the least number of events. Therefore, as shown in the screenshot, 5 is lonely and no one wants to communicate with it

repeat

Unconditionally and repeatedly send observed events have overloaded methods, can set the number of repeated creation

Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) { emitter.onNext(2); emitter.onNext(3); Emitters. OnError (new Exception(" Error ")); }}) //.retry() // No arguments = repeat () = infinite times; RepeatWhen (Integer int); Upon receiving the.onCompleted() event, re-subscribe & send is triggered // 2. Repeat (3).subscribe(new Observer<Integer>() {@override public void onSubscribe(Disposable d) {}...Copy the code

Combined with RxBing, combine events & joint judgments through combineLatest ()

Scene: This is nice when you have multiple elements that need to be evaluated at the same time, for example, your account and password, your name and phone number, your job, etc., except for RxJava, of course, There is also a view binding mechanism for RxBing. The version of RxBing used in this article is as follows:

Implementation 'com. Jakewharton. Rxbinding2: rxbinding: 2.1.1'Copy the code

Note:

Rxbinding already includes RxJava2 dependencies by default, so you can replace the RxJava dependencies with rxBinding if you reference them later. As shown below:

/** * @author: yangtianfu * @time: 2018/9/5 16:10 * @more: https://blog.csdn.net/ytfunnysite * @ the Describe for each EditText observed, used to send to monitor events * 1. Here we use RxBinding: rxTextView.textChanges (name) = to listen for changes to control data (function like TextWatcher) The compile 'com. Jakewharton. Rxbinding2: rxbinding: 2.0.0' * 2. Pass in the EditText control, and when any EditText is clicked to write, it sends the return value of the data event = Function3 (more on that below) * 3. Reasons for using Skip (1) : */ private void initCombineLatest() {Observable<CharSequence> nameObservable = RxTextView.textChanges(name).skip(1); Observable<CharSequence> ageObservable = RxTextView.textChanges(age).skip(1); Observable<CharSequence> jobObservable = RxTextView.textChanges(job).skip(1); Observable.binelatest (nameObservable, ageObservable, jobObservable, new Function3<CharSequence, CharSequence, CharSequence, Boolean>() { @Override public Boolean apply(CharSequence charSequence, CharSequence charSequence2, CharSequence charSequence3) {// Specify that the form information input cannot be empty // 1. Name information Boolean isUserNameValid =! TextUtils.isEmpty(name.getText()); // Boolean isUserNameValid =! TextUtils.isEmpty(name.getText()) && (name.getText().toString().length() > 2 && name.getText().toString().length() < 9);  Age information Boolean isUserAgeValid =! TextUtils.isEmpty(age.getText()); Boolean isUserJobValid =! TextUtils.isEmpty(job.getText()); Return isUserNameValid && isUserAgeValid && isUserJobValid; return isUserNameValid && isUserAgeValid && isUserJobValid; return isUserNameValid && isUserAgeValid && isUserJobValid; }}). Subscribe (new Consumer<Boolean>() {@override public void accept(Boolean aBoolean) throws Exception {/* * Step 6: subscribe(new Consumer<Boolean>() {@override public void accept(Boolean aBoolean) throws Exception {/* * Step 6: Return result & Set button clickable style **/ log. e(TAG, "Submit button clickable: "+aBoolean); ShowToast (" Submit button is clickable: "+aBoolean); list.setEnabled(aBoolean); }}); }Copy the code