First of all:

  • RXJava recommends mastering Lambda first, or defining a lot of generics. And the amount of code redundancy.
  • RxJava is divided into three parts,ObservableObserved sumObserverObservers and operators.
  • Reactive streaming programming is an architecture developed by Microsoft and implemented by Netfix.
  • RxJava is for The Java language, Android and JavaEE can be used, but currently mainly used by Android.
  • Operators (or methods) have many variations (or overloading of methods)

Advantages:

  • Asynchronous processing for multi-threaded tasks is very powerful and convenient
  • RxJava already provides some functionality that is complex to implement by itself

I usually project development necessary framework

  1. The strongest network request Net on Android
  2. The strongest list on Android (including StateLayout) BRV
  3. Android’s strongest default page StateLayout
  4. JSON and long text log printing tool LogCat
  5. Supports asynchronous and global customization of the toast tool Tooltip
  6. Develop the debugging window tool DebugKit
  7. One line of code creates the transparent StatusBar StatusBar

Official website:

  • ReactiveX

    ReactiveX supports many languages, and RxJava is just one of them (RxPHP, RxSwift, RxKotlin, for example).Copy the code
  • RxJava

  • RxKotlin

An extension for Android

  • RxAndroid

    The main schedulers in Android are added.Copy the code
  • RxBinding

    The framework is more cumbersome and bloated, it is recommended to achieve their own

  • AutoDispose

    My open source project automatically unsubscribs to the observer following the lifecycle. And you can set the life cycle. Simpler than Uber's open source project, AutoDisposeCopy the code
  • RxBus

    README supports three types of event-tag-event-tag + eventCopy the code
  • Net

    My open source project, fully automatic network request framework, based on the Kalle(Nohttp2.0 version) author Yan Zhenjie's library for re-packaging, adding RxJava/Kotlin and other new features. Specific see the READMECopy the code

The document

RxJavaDoc

RxJava Chinese translation document

The basic use

The scheduler

public final Observable<T> subscribeOn(Scheduler scheduler)

public final Observable<T> observeOn(Scheduler scheduler)
Copy the code

AndroidScheduler is a scheduler added to RxAndroid that mainly adds the main thread.

  1. By default, the observer (operator) follows the observed thread on which the instance was created.
  2. SubscribeOn determines the threads of the observed (only the threads specified for the first time are valid). Some of the observed threads cannot be changed if they have a default thread (e.g. Interval defaults to computation thread).
  3. ObserveOn determines the following observer threads (including operators) (which can be changed multiple times), including the execution thread of operator callbacks without a default scheduler below observerOn;

The operator

  • Scheduler operators that cannot be specified to follow the observed thread can passsubscribeOnTo control the thread from which the observed emits the event
  • Operators that have a default thread scheduler cannot be controlled by subscribeOn (but there are usually overloaded methods that can be controlled in parameters)

Scheduler type

Schedulers.computation( ) is used for computing tasks such as event loops or callback processing, Schedulers. IO ( ) Is used for IO intensive tasks, such as asynchronous blocking I/O operations. The scheduler's thread pool grows as needed; For ordinary computing tasks, Use schedulers.newthread ( ) to create a newThread for each task schedulers.trampoline ( ) to execute the task schedulers.single () all Schedulers that use this scheduler are always in the same thread, Tasks are executed in first-in, first-out order.Copy the code

Unsubscribing in batches

CompositeDisposable

Example:

CompositeDisposable compositeDisposable=new CompositeDisposable();

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1); emitter.onComplete(); Or emitter. OnError (new Throwable("O__O "));
            }
        }).subscribe(new Observer<Integer>() {
            private Disposable mDisposable;
            @Override
            public void onSubscribe(Disposable d) {
               / / subscribe
                mDisposable = d;
               // Add to the container
                compositeDisposable.add(d);
            }
            @Override
            public void onNext(Integer value) {
               // Determine mDisposable. IsDisposed () If disposed, no need to dispose
            }
            @Override
            public void onError(Throwable e) {}@Override
            public void onComplete(a) {}});// Unsubscribe all subscribers
        compositeDisposable.clear();
Copy the code

It’s important to note that an Observable sends events only when it has a subscriber. If the subscriber is canceled, it doesn’t send any events, so you don’t have to worry about memory leaks.

Periodic function

DoOn **() is a series of methods that can be performed before the observer callback

public final Flowable<T> doOnDispose(Action onCancel)

    public final Flowable<T> doOnComplete(Action onComplete)

    public final Flowable<T> doOnEach(Consumer<? super Notification<T>> onNotification)
    // Sending any event triggers a callback (including onError onComplete)

    public final Flowable<T> doOnEach(org.reactivestreams.Subscriber<? super T> subscriber)

    public final Flowable<T> doOnError(Consumer<? super java.lang.Throwable> onError)

    public final Flowable<T> doOnLifecycle(Consumer<? super org.reactivestreams.Subscription> onSubscribe,
                                           LongConsumer onRequest,
                                           Action onCancel)


    public final Flowable<T> doOnNext(Consumer<? super T> onNext)

    public final Flowable<T> doOnRequest(LongConsumer onRequest)
    // This method is used to track back pressure and is often used for debugging, so Observable doesn't have this method

    public final Flowable<T> doOnSubscribe(Consumer<? super org.reactivestreams.Subscription> onSubscribe)
    // Callback before subscribing to observed

    public final Flowable<T> doOnTerminate(Action onTerminate)
    // This callback is called before the onComplete and onError methods, either exception or completion
Copy the code

Notification

Because the doOnEach callback fires on all events, the Notification contains information about all events

java.lang.Throwable	getError(a)
// An exception message is returned if the event is onError, otherwise null

T	getValue(a)
// This value is returned if the event is onNext, otherwise null is returned

boolean	isOnComplete(a)

boolean	isOnError(a)

boolean	isOnNext(a)

// Notification provides static methods that directly constitute instance objects of three kinds of events
static <T> Notification<T>	createOnComplete(a)

static <T> Notification<T>	createOnError(java.lang.Throwable error)

static <T> Notification<T>	createOnNext(T value)
Copy the code

Lifecycle interception

Lift

Observable.just(1).lift(ObservableOperator<String, Int> {

  object :DisposableObserver<Int> () {override fun onComplete(a){}override fun onNext(t: Int) {
      Log.d("Log"."(mainActivity.kt :43):onNext intercept t =$t")}override fun onError(e: Throwable) {
    }
  }

}).subscribe(object: Observer<String? > {override fun onComplete(a){}override fun onSubscribe(d: Disposable){}override fun onNext(t: String) {
    Log.d("Log"."(mainActivity.kt :59):onNext final t =$t")}override fun onError(e: Throwable){}})Copy the code

Compose

public final <R> Observable<R> compose(ObservableTransformer<? super T,? extends R> composer)
Copy the code

Compose is a stream conversion operator, unlike FlatMap, which is a data conversion operator, which is composed for the entire stream.

Flatmap executes as many times as Compose executes.

Observable.just(1).compose(object : ObservableTransformer<Int, String> {

    override fun apply(upstream: Observable<Int>): ObservableSource<String> {
    }
})
Copy the code

###AS

Observable.just(1). `as` (object : ObservableConverter<Int, ObservableSource<String>> {
    override fun apply(upstream: Observable<Int>): ObservableSource<String> {
    }
})
Copy the code
  • Compose is a data flow transformation, and AS can be a pure data transformation.

Create observed

Create

Used to create custom Observables

public static <T> Observable<T> create(ObservableOnSubscribe<T> source)
Copy the code

There is only one method in ObservableOnSubscribe

void subscribe(ObservableEmitter<T> e)
               throws java.lang.Exception
Copy the code

ObservableEmitter. There are three ways to send events

void	onComplete(a)

void	onError(java.lang.Throwable error)

void	onNext(T value)
Copy the code

Tip:

  • onError()andonComplete()Cannot be used simultaneously
  • onError()Can’t be used more than once,onComplete()can
  • Check whether the observer has unsubscribed before launching events or exceptions. Otherwise, exceptions will be thrown

Other methods:

void setCancellable(Cancellable c)
// Set a cancel event listener

void setDisposable(Disposable d)

boolean	isDisposed(a)
Copy the code

Just

Batch events by passing N parameters directly (up to nine parameters)

static <T> Observable<T>	just(T item) 
Copy the code

The onComplete method is called back after all events are sent

FromArray

Send events by passing in an array or collection

static <T> Observable<T>	fromArray(T... items)
Copy the code

FromIterable

Iterable is the root interface for Iterable collections, which can be sent by sending collections;

List<String> list = new ArrayList<>();
list.add("a");
list.add("b");
list.add("c");

Flowable.fromIterable(list).subscribe(
        s -> Log.i("tag", s)
);
Copy the code

More ways

public static <T> Observable<T> fromArray(T... items)

public static <T> Observable<T> fromCallable(java.util.concurrent.Callable<? extends T> supplier)

public static <T> Observable<T> fromFuture(java.util.concurrent.Future<? extends T> future)

public static <T> Observable<T> fromFuture(java.util.concurrent.Future<? extends T> future,
                                                                  long timeout,
                                                                  java.util.concurrent.TimeUnit unit)
Copy the code

Defer

The observed can only be created in the callback function

public static <T> Observable<T> defer(java.util.concurrent.Callable<? extends ObservableSource<? extends T>> supplier)
Copy the code

StartWith

Add events before those already created

public final Observable<T> startWith(T item)

public final Observable<T> startWithArray(T... items)
Copy the code

End stream immediately

Empty

Call onComplete() immediately without sending any events to the observer

Flowable.empty().subscribe(
        obj -> Log.i("tag"."next" + obj.toString()),
        e -> Log.i("tag"."error"),
        () -> Log.i("tag"."complete"));
Copy the code

Never

Do not send any events to the observer and do not execute any methods (such as OnComplete)

Error

Does not send any events, but executes onError() directly

Round robin device

Interval

Timing control sends events at intervals. The default is ComputationScheduler, where threads can be specified.

You can only control the interval

public static Observable<java.lang.Long> interval(long period,
                                                  java.util.concurrent.TimeUnit unit)

public static Observable<java.lang.Long> interval(long initialDelay, // First delay time
                                                    long period, // Interval time
                                                    java.util.concurrent.TimeUnit unit) // Time unit
Copy the code

IntervalRange

Timer events are sent to a more precise range, defaulting to the ComputationScheduler, which can be specified.

You can control the number of times you send it

public static Observable<java.lang.Long> intervalRange(long start,  // Start the number
                                                       long count, // Total number of times
                                                       long initialDelay, // Initial interval
                                                       long period, // After each interval
                                                       java.util.concurrent.TimeUnit unit) // Time unit
Copy the code

Timer

Equal to the interval

  • Specify interval time
  • Unit of time
  • Specifying a thread (computationScheduler)
public static Observable<java.lang.Long> timer(long delay, // Interval time
                                               java.util.concurrent.TimeUnit unit) // Time unit
Copy the code

TimerInterval

Record the information of the round robin

  • The time interval
  • Unit of time
  • value
  • The specified thread
public final Observable<Timed<T>> timeInterval()

// Set the interval to a fixed unit
public final Observable<Timed<T>> timeInterval(TimeUnit unit)
Copy the code

Example output:

Timed[time=1003, unit=MILLISECONDS, value=12] Timed[time=1, unit=SECONDS, value=40]Copy the code

Range

The observer receives a range of numbers that are not executed by default in a particular thread.

  • Began to digital
  • Digital range

// Send an int event type
public static Observable<java.lang.Integer> range(int start, / /
                                                  int count) / / end

// Send the event type of long
public static Observable<java.lang.Long> rangeLong(long start,
                                                     long count)
Copy the code

Repeat

Repeat send event

  • An infinite loop
  • cycles
  • End cycle time
  • End cycle condition
public final Observable<T> repeat(a)
// Infinite loop

public final Observable<T> repeat(long times)
// Set the number of loops

public final Observable<T> repeatUntil(BooleanSupplier stop)
// Set the end of loop condition

public final Observable<T> repeatWhen(Function<? superObservable<java.lang.Object>, ? extends ObservableSource<? >> handler)
// Add an observed as a condition for resending the event
Copy the code

The sample

        io.reactivex.Observable.just(1)
                .repeatUntil(
                        new BooleanSupplier() {/** * @returnreturntrue* @throws Exception */ @override public Boolean getAsBoolean() throws Exception {return true;
                            }
                        })
                .subscribe(System.out::println);
Copy the code

RepeatWhen

If the observed in the callback sends onComplete and onError events it does not enter a repeat event

However, sending the onNext event will result in repeated sending

Observable.intervalRange(0, 5, 1, 1, TimeUnit.SECONDS, TrampolineScheduler.instance()) .repeatWhen( new Function<Observable<Object>, ObservableSource<?>>() { @Override public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {// Wait five seconds after the source is completed (onComplete) and resend againreturn Observable.interval(5, TimeUnit.SECONDS);
        }
    })
    .subscribe(
    new Consumer<Long>() {
        @Override
        public void accept(Long aLong) throws Exception {
            // do something
        }
    });
Copy the code

Multi-observer creation

Concat

Connect multiple observers and send them in order

The ConcatArray and Concat operators are the same, except that they accept arrays

Merge

Merge multiple observed objects in chronological order (not in parameter addition order)

public static <T> Observable<T> mergeArray(int maxConcurrency,
                                           int bufferSize,
                                           ObservableSource<? extends T>... sources)
Copy the code

Zip

Events sent by multiple emitters can be combined into one in the corresponding order and then received at a time, following the principle of pairwise merging.

If there is asynchrony, it waits for the two events to be merged to execute simultaneously before sending them to the observer.

  • Follow the subscription order
public static <T,R> Observable<R> zip(ObservableSource<? extends ObservableSource<? extends T>> sources,
                                      Function<? super java.lang.Object[],? extends R> zipper)
Copy the code

Example:

Observable.zip(getStringObservable(), getIntegerObservable(),
               new BiFunction<String, Integer, String>() {
                 @Override public String apply(@NonNull String s, @NonNull Integer integer)
                   throws Exception {
                   // Merge the events of the two emitters here and send them uniformly
                   return s + integer;
                 }
               }).subscribe(new Consumer<String>() {
  @Override public void accept(@NonNull String s) throws Exception {
    // Only apply returns will be accepted}});Copy the code

ZipWith

Combine (Zip) the incoming and source observed. This method is non-static

    Observable.just(1).zipWith(Observable.just(2), new BiFunction<Integer, Integer, String>() {
        @Override
        public String apply(Integer integer, Integer integer2) throws Exception {
            System.out.println("integer = [" + integer + "], integer2 = [" + integer2 + "]");
            return integer + "" + integer2;
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            // Result: s = [12]
            System.out.println("s = [" + s + "]"); }});Copy the code

CombineLast

All events of the last observed are merged in turn with the last events of the previous observed, and the observer receives the merged events in turn

  • Follow the subscription observer order
public static <T1,T2,T3,R> Observable<R> combineLatest(ObservableSource<? extends T1> source1,
                                                       ObservableSource<? extends T2> source2,
                                                       Function3<? super T1,? super T2,? super T3,? extends R> combiner)

Copy the code

Example results:

The last event of the last observed and all previous observed is received at the same time

S = [round robin 1 10 round robin 2 10 round robin 3 1] s = [round robin 1 10 round robin 2 10 round robin 3 2] s = [round robin 1 10 round robin 2 10 round robin 3 3] s = [round robin 1 10 round robin 2 10 round robin 3 3] S = [round robin 1 10 round robin 2 10 round robin 3 4] S = [round robin 1 10 round robin 2 10 round robin 3 5] S = [round-robin 1 10 round-robin 2 10 round-robin 3 6] s = [round-robin 1 10 round-robin 2 10 round-robin 3 7] S = [round-robin 1 10 round-robin 2 10 round-robin 3 8] S = [round-robin 1 10 round-robin 2 10 round-robin 3 9] S = [round-robin 1 10 round-robin 2 10 round-robin 3 10]Copy the code

This function is sent to the observer by combining all events belonging to the first observer with the latest events of all subsequent observers

public final <R> Observable<R> withLatestFrom(ObservableSource<? >[] others, Function<?super java.lang.Object[],R> combiner)
Copy the code

Change operator

FlatMap

Intercepting the event and then transferring it to the observed again

Staggered ordering, which means that the emitter may send events asynchronously or latently.

public final <R> Observable<R> flatMap(Function<? super T,? extends ObservableSource<? Extends R > > mapper) / / onNext/onError/onComplete callback public final < R > observables < R > flatMap (Function <? super T,? extends ObservableSource<? extends R>> onNextMapper, Function<? super java.lang.Throwable,? extends ObservableSource<? extends R>> onErrorMapper, java.util.concurrent.Callable<? extends ObservableSource<? extends R>> onCompleteSupplier) public final <U,R> Observable<R> flatMap(Function<? super T,? extends ObservableSource<? extends U>> mapper, BiFunction<? super T,? super U,? extends R> resultSelector)Copy the code

FlatMapIterable

public final <U> Observable<U> flatMapIterable(Function<? super T,? extends java.lang.Iterable<? extends U>> mapper)
// The collection type is received by the observer
Copy the code

Overloading methods eventually convert an iterable back into a single object

public final <U,V> Observable<V> flatMapIterable(
    Function<? super T,? extends java.lang.Iterable<? extends U>> mapper,
    BiFunction<? super T,? super U,? extends V> resultSelector)
Copy the code

The sample

 Observable.just(1.2.3.4.5).flatMapIterable(new Function<Integer, Iterable<String>>() {
      @Override public Iterable<String> apply(Integer integer) throws Exception {
        Log.d("Log"."(MainActivity.java:32) ___ " + "apply() called with: integer = [" + integer + "]");
        
        // Iterable belongs to the root interface of all collections
        ArrayList<String> strings = new ArrayList<>();
        strings.add(integer.toString() + "In the set");
        returnstrings; }},new BiFunction<Integer, Object, String>() {

      /** * gets an event that is finally accepted by the observer@paramT1 emitter events *@paramT2 is added to the collection object *@returnThe event that is ultimately accepted by the observer *@throwsException Throws an Exception */ if null is returned
      @Override public String apply(Integer integer, Object o) throws Exception {
        Log.d("Log"."(MainActivity.java:39) ___ "
            + "apply() called with: integer = ["
            + integer
            + "], o = ["
            + o
            + "]");
        // If null is returned, an exception is thrown into onError
        return Daniel Wu;
      }
    }).subscribe(new Observer<String>() {
      @Override public void onSubscribe(Disposable d) {
        Log.i("Log"."(MainActivity.java:49) ___ onSubscribe");
      }

      @Override public void onNext(String s) {
        Log.d("Log"."(MainActivity.java:53) ___ " + "onNext() called with: s = [" + s + "]");
      }

      @Override public void onError(Throwable e) {
        Log.i("Log"."(MainActivity.java:57) ___ onError");
      }

      @Override public void onComplete(a) {
        Log.i("Log"."(MainActivity.java:61) ___ onComplete"); }});Copy the code

FatMapCompletable

Only onComplete callbacks are accepted

public final Completable flatMapCompletable(Function<? super T,? extends CompletableSource> mapper)

    public final Completable flatMapCompletable(Function<? super T,? extends CompletableSource> mapper,
                                                boolean delayErrors)
Copy the code

FlatMapMaybe

public final <R> Observable<R> flatMapMaybe(Function<? super T,? extends MaybeSource<? extends R>> mapper);


public final <R> Observable<R> flatMapMaybe(Function<? super T,? extends MaybeSource<? extends R>> mapper,
                                            boolean delayErrors)
Copy the code

FlatMapSingle

Single receiver

public final <R> Observable<R> flatMapSingle(Function<? super T,? extends SingleSource<? extends R>> mapper)


public final <R> Observable<R> flatMapSingle(Function<? super T,? extends SingleSource<? extends R>> mapper,
                                                 boolean delayErrors)
Copy the code

ConcatMap

The difference between FlatMap and FlatMap is that sequential emission is guaranteed (there is no staggered sequence) and Concat is used internally.

For example: will two asynchronous observers send events in the order you added the parameters anyway

public final <R> Flowable<R> concatMap(Function<? super T,? extends org.reactivestreams.Publisher<? extends R>> mapper)

public final <R> Flowable<R> concatMap(Function<? super T,? extends org.reactivestreams.Publisher<? extends R>> mapper,
                                           int prefetch)
Copy the code

ConcatMapDelayError

Delayed exceptions are thrown until all events have been sent

public final <R> Flowable<R> concatMapDelayError(Function<? super T,? extends org.reactivestreams.Publisher<? extends R>> mapper)

public final <R> Flowable<R> concatMapDelayError(Function<? super T,? extends org.reactivestreams.Publisher<? extends R>> mapper,
                                                     int prefetch,
                                                     boolean tillTheEnd)
Copy the code

ConcatMapEager

Add all observed events to one observed and send them all at once;

public final <R> Observable<R> concatMapEager(Function<? super T,? extends ObservableSource<? extends R>> mapper)
Copy the code

Events changes

Map

Type conversion

public final <R> Flowable<R> map(Function<? super T,? extends R> mapper)
Copy the code

SwitchMap

Each time the source observer sends data, a new observer is sent to the observer, but if there is a delay operation, only the new observer created by the last source observer is sent.

Before RxJava2 SwitchMap was called FlatMapLatest, if this is easier to understand

public final <R> Observable<R> switchMap(Function<? super T,? extends ObservableSource<? extends R>> mapper)
Copy the code

The sample

    Observable.just(1.2.3)
            .switchMap(newFunction<Integer, ObservableSource<? > > () {@Override
              public ObservableSource<Long> apply(Integer integer) throws Exception {
                return interval1;
              }
            })
            .subscribe(ele -> Log.d("Log"."(MainActivity.java:39) ___ Result = " + ele));
Copy the code

Intercept events

Buffer

Create a List collection to store events to send to observers

  • The number of
  • span
  • time
public final Observable<java.util.List<T>> buffer(int count) // Add the number of events to the List

public final Observable<java.util.List<T>> buffer(int count,
                                                  int skip) // Segment event span
Copy the code

For example, {1.2.3.4.5} count = 3, skip = 2.

The event received is {1,2,3} {3,4,5} {5}

public final <B> Observable<java.util.List<T>> buffer(java.util.concurrent.Callable<? extends ObservableSource<B>> boundarySupplier)
// Encapsulates a boundarySupplier into a Collection on the callback interface


// Cache events for a certain amount of time to add to the collection
public final Observable<java.util.List<T>> buffer(long timespan,
                                                  long timeskip,
                                      			java.util.concurrent.TimeUnit unit)

Copy the code

Window

Creates an observer-stored event that is sent to the observer. The difference with a Buffer is that instead of a collection, the observer is returned, which contains events within the span.

Points in time

public final Observable<Observable<T>> window(long timespan,
                                              java.util.concurrent.TimeUnit unit)
// Group by time span

public final Observable<Observable<T>> window(long timespan,
                                                  java.util.concurrent.TimeUnit unit,
                                                  long count)
    // Group by the number of events within the span of time (only a specified number of events can be added to the observed)

public final Observable<Observable<T>> window(long timespan,
                                                  long timeskip,
                                                  java.util.concurrent.TimeUnit unit)
    // Group by time span

public final Observable<Observable<T>> window(long timespan,
                                                  java.util.concurrent.TimeUnit unit,
                                                  long count,
                                                  boolean restart)
Copy the code

Observer by observer transmission

public final Observable<Observable<T>> window(long count)
// Group by quantity

public final Observable<Observable<T>> window(long count,
                                              long skip)
// Group by span
Copy the code

The sample

    Observable.just(1.2.3.4).window(3).subscribe(new Consumer<Observable<Integer>>() {
      @Override public void accept(Observable<Integer> integerObservable) throws Exception {
        Log.i("Log"."(mainactivity.java :19) ___ observer");
        
        integerObservable.subscribe(new Consumer<Integer>() {
          @Override public void accept(Integer integer) throws Exception {
            Log.i("Log"."(mainactivity.java :23) ___ received an event"); }}); }});Copy the code
public final <B> Observable<Observable<T>> window(Callable<? extends ObservableSource<B>> boundary)
// Each time the parameter observer sends an event, it causes interception
    
public final <U,V> Observable<Observable<T>> window(ObservableSource<U> openingIndicator,
                               Function<? super U,? extends ObservableSource<V>> closingIndicator)

public final <U,V> Observable<Observable<T>> window(ObservableSource<U> openingIndicator,
                                      Function<? super U,? extends ObservableSource<V>> closingIndicator,
                                                     int bufferSize)
// openingIndicator sends events to start interception, closingIndicator sends events to stop interception and sends segments to the observer
Copy the code

GroupBy

Observable changes Observable’s event group to GroupedObservable based on the key. This class inherits from Observable, but adds a new method

GetKey () gets the key of the object returned in the callback;

// Each event sent returns a key in the parameter callback, which is grouped if the keys are equal
public final <K> Observable<GroupedObservable<K,T>> groupBy(Function<? super T,? extends K> keySelector)



// Group and send the event again
public final <K,V> Observable<GroupedObservable<K,V>> groupBy(Function<? super T,? extends K> keySelector,
                                                   			Function<? super T,? extends V> valueSelector)
Copy the code

ToList

Convert an event into a collection once accepted; However, the main requirement is to unify the event type of the observed

public final Single<java.util.List<T>> toList()

public final Single<java.util.List<T>> toList(int capacityHint)
// Set initializes space

public final <U extends java.util.Collection<? super T>> Single<U> toList(java.util.concurrent.Callable<U> collectionSupplier)
Copy the code

Remove duplicate events

Distinct

Duplicate the same event

public final Flowable<T> distinct(a)
// Remove all duplicate events

public final <K> Observable<T> distinct(Function<? super T,K> keySelector)
// Return a generic value in the callback and compare the generic value to see if it is a duplicate

public final <K> Observable<T> distinct(Function<? super T,K> keySelector,
                                                                java.util.concurrent.Callable<? extends java.util.Collection<? super K>> collectionSupplier)

Copy the code

Example:

    /* demo takes only two even numbers */
    Observable.just(1.2.3.4.5.6).distinct(new Function<Integer, String>() {
      /** * This method calls back every time it sends an event@throws Exception
       */
      @Override public String apply(Integer integer) throws Exception {
        return integer % 2= =0 ? "Even" : "Odd";
      }
    }).subscribe(new Consumer<Integer>() {
      @Override public void accept(Integer integer) throws Exception {
        Log.d("Log"."(MainActivity.java:34) ___ " + "accept() called with: integer = [" + integer + "]"); }});Copy the code

DistinctUntilChanged

Remove adjacent duplicate events

public final Flowable<T> distinctUntilChanged(a)
// Only adjacent duplicate events are removed

public final Observable<T> distinctUntilChanged(BiPredicate<? super T,? super T> comparer)
// This callback will return two adjacent events at a time, and then you can compare the two values in the callback to see if they are repeated, and return a Boolean type

public final <K> Observable<T> distinctUntilChanged(Function<? super T,K> keySelector)
// Return a generic value in the callback and compare the generic value to see if it is a duplicate
Copy the code

Example:

    Observable.just(1.2.2.4.5.6).distinctUntilChanged(new BiPredicate<Integer, Integer>() {
      @Override public boolean test(Integer integer, Integer integer2) throws Exception {
        
        return integer.equals(integer2);
      }
    }).subscribe(new Consumer<Integer>() {
      @Override public void accept(Integer integer) throws Exception {
        Log.d("Log"."(MainActivity.java:34) ___ " + "accept() called with: integer = [" + integer + "]"); }});Copy the code

Only specified data is sent

Element

Only events at the specified location (index) are sent

public final Maybe<T> elementAt(long index) / / index

public final Single<T> elementAt(long index,
                                 T defaultItem) // Index out of bounds sends events

public final Maybe<T> firstElement(a)
// Send only the first one

public final Maybe<T> lastElement(a)

public final Completable ignoreElements(a)
// Ignore all events

public final Single<T> elementAtOrError(long index)
// If the event is empty, an exception is thrown
Copy the code

Debounce

Events that are sent again within a certain amount of time after the event is sent are discarded and the timer restarts.

This scenario is commonly used in the search input box automatic prompt: you continuously input text will send events, that is, the time is constantly reset to never successfully send events. The event will only be sent successfully if you stop typing.

public final <U> Observable<T> debounce(Function<? super T,? extends ObservableSource<U>> debounceSelector)

public final Observable<T> debounce(long timeout,
                                    java.util.concurrent.TimeUnit unit)
Copy the code

There’s another way to do it

public final Observable<T> throttleWithTimeout(long timeout,

public final Observable<T> throttleWithTimeout(long timeout,
                                               java.util.concurrent.TimeUnit unit,
                                               Scheduler scheduler)
Copy the code

Filter

Filter events, such as not sending events if the string is empty

Observable.just(1.20.65, -5.7.19)
                .filter(new Predicate<Integer>() {
                    @Override
                    public boolean test(@NonNull Integer integer) throws Exception {
                      // Determine whether to intercept the event based on the Boolean type of the returned result
                        return integer >= 10;
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {}});Copy the code

All

The callback function is executed multiple times to determine whether all events meet the criteria but the observer receives only one Boolean value;

public final Single<java.lang.Boolean> all(Predicate<? super T> predicate)
Copy the code
Flowable.just(1.2.3.4).all(new Predicate<Integer>() {
    @Override public boolean test(Integer integer) throws Exception {
        return integer < 4;
    }
}).subscribe(new Consumer<Boolean>() {
    @Override public void accept(Boolean aBoolean) throws Exception {}});Copy the code

First

Fires only the first event, or the specified parameter if no event is fired directly onComplete().

The return value is Single and the observed object does not have an onComplete function.

public final Single<T> first(T defaultItem)

public final Single<T> firstOrError(a)
// If only onComplete will emit onError
Copy the code

Last

Only the last event is fired, and if no event is fired directly onComplete(), the specified parameter is fired.

public final Single<T> last(T defaultItem)
Copy the code

Amb

Only the observed that processes the first emitted event, all other observers are discarded.

It is required that the added event types of all observed events be unified;

public static <T> Observable<T> amb(java.lang.Iterable<? extends ObservableSource<? extends T>> sources)
// Set type
Copy the code

ambArray

The only difference is that the parameters become variable parameters

public static <T> Observable<T> ambArray(ObservableSource<? extends T>... sources)
// Variable parameter type
Copy the code

AmbWith

A nonstatic function that ends the subscription to the source observer and then the subscription to the current specified observer, similar to switching an observer.

public final Observable<T> ambWith(ObservableSource<? extends T> other)
Copy the code

Blocking

This operator breaks RxJava’s chain-call direct events

Send the first or only | the last event

public final T blockingFirst()

public final T blockingFirst(T defaultItem)

public final T blockingLast()

public final T blockingLast(T defaultItem)

public final T blockingSingle()
Copy the code
  • If the observed event is null, an exception is thrown if the default value is not set
  • Single is used, but an exception is thrown if observable has more than one event

Example:

Long aLong = Observable.intervalRange(0, 5, 1, 1, TimeUnit.SECONDS)
.blockingFirst();
Copy the code

cycle

public final void blockingForEach(Consumer<? super T> onNext) public final java.lang.Iterable<T> blockingIterable() public final java.lang.Iterable<T> blockingIterable(int bufferSize) public final java.lang.Iterable<T> blockingMostRecent(T initialValue) public final void  blockingSubscribe()Copy the code

Restricted acceptance event

  • The first one in the specified time
  • The last one within the specified time

ThrottleFirst

Only the first event in a certain amount of time is sent, which is executed on the ComputationScheduler by default, but the Scheduler can be specified

public final Observable<T> throttleFirst(long windowDuration,
                                         java.util.concurrent.TimeUnit unit)

public final Observable<T> throttleFirst(long skipDuration,
                                             java.util.concurrent.TimeUnit unit,
                                             Scheduler scheduler)
Copy the code

ThrottleLast

Only the last event within a certain period of time is sent

public final Observable<T> throttleLast(long  intervalDuration, 
                                        java.util.concurrent.TimeUnit unit)

public final Observable<T> throttleLast(long intervalDuration,
                                                                java.util.concurrent.TimeUnit unit,
                                                                Scheduler scheduler)
Copy the code

ThrottleWithTimeout

After sending event A, the timing will be triggered. If A new event B is sent within the specified time, event A will be discarded. It has the same functionality as debounce, but debounce can add an Observable as a timer;

It can be interpreted as accepting only the last event in a certain amount of time

public final Observable<T> throttleWithTimeout(long timeout,
                                               java.util.concurrent.TimeUnit unit)

public final Observable<T> throttleWithTimeout(long timeout,
                                                 java.util.concurrent.TimeUnit unit,
                                                 Scheduler scheduler)
Copy the code

Sample

Control interval, within a certain period of time only to take the last event emitted, can specify the thread. Equivalent throttleLast

public final Observable<T> sample(long period,
                                   java.util.concurrent.TimeUnit unit)

public final Observable<T> sample(long period,
                                  java.util.concurrent.TimeUnit unit,
                                 boolean emitLast)
Copy the code

public final <U> Observable<T> sample(ObservableSource<U> sampler)

public final <U> Observable<T> sample(ObservableSource<U> sampler,
                                                              boolean emitLast)
Copy the code

TimeOut

  public final Observable<T> timeout(long timeout,
                                     java.util.concurrent.TimeUnit timeUnit)


  public final Observable<T> timeout(long timeout,
                                     java.util.concurrent.TimeUnit timeUnit,
                                     ObservableSource<? extends T> other)
Copy the code
  1. Set a timeout interval that emits a TimeoutException(into onError) if the event is sent more than that.
  2. You can use this method if you don’t want to emit an exception but instead send a new observed event

Unlike the above operator, this operator controls the timeout through the Observable returned by the callback function. If the returned Observable sends an event but the source observer has not, a timeout occurs. Go to onError and throw a TimeOutException

public final <V> Observable<T> timeout(Function<? super T,? extends ObservableSource<V>> itemTimeoutIndicator)

public final <V> Observable<T> timeout(Function<? super T,? extends ObservableSource<V>> itemTimeoutIndicator,
                                         ObservableSource<? extends T> other)
Copy the code

The following overload sets a separate timeout parameter (Observable) for the first event sent by the observer

public final <U,V> Observable<T> timeout(ObservableSource<U> firstTimeoutIndicator,
                                         Function<? super T,? extends ObservableSource<V>> itemTimeoutIndicator)

public final <U,V> Observable<T> timeout(ObservableSource<U> firstTimeoutIndicator,
                                           Function<? super T,? extends ObservableSource<V>> itemTimeoutIndicator,
                                           ObservableSource<? extends T> other)
Copy the code

Skip the event

  • The number of
  • Specify a time
  • Reverse order to skip
  • The specified observer is finished sending
  • The callback function is skipped continuously

Skip

Skip the event

public final Observable<T> skip(long count)
// Skip a specified number of events

public final Observable<T> skip(long time,
                                  java.util.concurrent.TimeUnit unit)
// Skip events within a specified time
Copy the code

SkipLast

Skip sending events from last;

public final Observable<T> skipLast(int count)

public final Observable<T> skipLast(long time,
                                      java.util.concurrent.TimeUnit unit)

public final Flowable<T> skipLast(long time,
                                    java.util.concurrent.TimeUnit unit,
                                    boolean delayError)
Copy the code

SkipUntil

All source observer events will be skipped before the observed as a parameter has sent the event.

public final <U> Observable<T> skipUntil(ObservableSource<U> other)
Copy the code

Sample code:

Observable.intervalRange(0.5.1.1, TimeUnit.SECONDS)
    // If the following observed events are not completed (excluding onComplete), the source observed events will be skipped
    .skipUntil(Observable.just(1)
        .delay(2, TimeUnit.SECONDS)) 
    .subscribe(new Consumer<Long>() {
      @Override
      public void accept(Long aLong) throws Exception {
        Log.d("Log"."(MainActivity.java:80) ___ " + "accept() called with: aLong = [" + aLong + "]"); }});Copy the code

SkipWhile

Determine whether to discard events in a callback;

Unlike the filter operator, skipWhile can skip events only from the beginning, meaning that if you do not skip the first event, the callback will not be executed again, meaning that you cannot skip events at all.

public final Observable<T> skipWhile(Predicate<? super T> predicate)
Copy the code

To stop

  • Maximum number of events that can be accepted
  • The time at which events can be received
  • Parameter Observer sends an event that stops the source observer’s subscription status
  • Reverse order
  • The callback function determines whether to stop

Take

This function is similar to Skip, except that the first time an emitter is prevented from firing an event, it goes directly to the onComplete callback

public final Observable<T> take(long count)
// Controls the maximum number of events received

public final Observable<T> take(long time,
                               java.util.concurrent.TimeUnit unit)
// Control can only receive events within a specified time
Copy the code

TakeUntil

public final <U> Observable<T> takeUntil(ObservableSource<U> other)
// If the parameter observer sends an event, the source observer is sent an end event

public final Observable<T> takeUntil(Predicate<? super T> stopPredicate)
// Call back to determine whether to end the sending of the event, return true to end the emitter event
Copy the code

TakeLast

public final Observable<T> takeLast(int count)

public final Observable<T> takeLast(long count,
                                      long time,
                                      java.util.concurrent.TimeUnit unit)

public final Observable<T> takeLast(long time,
                                      java.util.concurrent.TimeUnit unit)

public final Observable<T> takeLast(long time,
                                      java.util.concurrent.TimeUnit unit,
                                      boolean delayError)
Copy the code

TakeWhile

In the callback, determine whether to terminate the emitter (again into onComplete), but unlike TakeUntil, return false to terminate.

public final Observable<T> takeWhile(Predicate<? super T> predicate)
Copy the code

Join

Add an observed (called target observed). Each event sent by the observed will in turn be combined with all the events of the source observed (passed in the same callback function).

public final <TRight,TLeftEnd,TRightEnd,R> Observable<R> join(ObservableSource<? extends TRight> other,
                                                                                      Function<? super T,? extends ObservableSource<TLeftEnd>> leftEnd,
                                                                                      Function<? super TRight,? extends ObservableSource<TRightEnd>> rightEnd,
                                                                                      BiFunction<? super T,? super TRight,? extends R> resultSelector)
Copy the code

Sample code:

    Observable.just(1L.2L.3L.4L)
        .join(Observable.just(5L.6L.7L.8L), new Function<Long, ObservableSource<Long>>() {

          /** * receives the source observed event *@param aLong
           * @returnThe returned observed sending event terminates the source observed sending event *@throws Exception
           */
          @Override
          public ObservableSource<Long> apply(Long aLong) throws Exception {
            Log.d("Log"."(MainActivity.java:65) ___ " + "Source observer aLong = [" + aLong + "]");
            return Observable.interval(3, TimeUnit.SECONDS); }},new Function<Long, ObservableSource<Long>>() {

          /**
           * 接受添加的被观察者事件(join Observable)
           * @param aLong
           * @returnThe returned observed sends an event that terminates the added observed *@throws Exception
           */
          @Override
          public ObservableSource<Long> apply(Long aLong) throws Exception {
            Log.d("Log"."(MainActivity.java:75) ___ " + "Added observer aLong = [" + aLong + "]");
            return Observable.interval(3, TimeUnit.SECONDS); }},new BiFunction<Long, Long, String>() {
          /** * accepts both the added observed and the source observed events *@paramALong The event that the source sent by the observer *@paramALong2 The added event sent by the observer *@returnThe return value is finally received by the observer@throws Exception
           */
          @Override
          public String apply(Long aLong, Long aLong2) throws Exception {
            Log.d("Log"."(MainActivity.java:89) ___ " + "apply() called with: aLong = [" + aLong
                + "], aLong2 = [" + aLong2 + "]");
            return aLong + "" + aLong2;
          }
        })
        .subscribe(new Consumer<String>() {
          @Override
          public void accept(String s) throws Exception {
            Log.d("Log"."(MainActivity.java:83) ___ " + "accept() called with: s = [" + s + "]"); }});Copy the code

GroupJoin

Similar to Join

public final <TRight,TLeftEnd,TRightEnd,R> Observable<R> groupJoin(ObservableSource<? extends TRight> other,
                                                                                           Function<? super T,? extends ObservableSource<TLeftEnd>> leftEnd,
                                                                                           Function<? super TRight,? extends ObservableSource<TRightEnd>> rightEnd,
                                                                                           BiFunction<? super T,? super Observable<TRight>,? extends R> resultSelector)

Copy the code

Example:

    Observable.just(1L.2L.3L.4L)
        .groupJoin(Observable.just(5L.6L.7L.8L),
            new Function<Long, ObservableSource<Long>>() {
              @Override
              public ObservableSource<Long> apply(Long aLong) throws Exception {
                return null; }},new Function<Long, ObservableSource<Long>>() {
              @Override
              public ObservableSource<Long> apply(Long aLong) throws Exception {
                return null; }},new BiFunction<Long, Observable<Long>, String>() {
              @Override
              public String apply(Long aLong, Observable<Long> longObservable) throws Exception {
                return null;
              }
            })
        .subscribe(new Consumer<String>() {
          @Override
          public void accept(String s) throws Exception {
            Log.d("Log"."(MainActivity.java:78) ___ " + "accept() called with: s = [" + s + "]"); }});Copy the code

ToMap

public final <K> Single<java.util.Map<K,T>> toMap(Function<? super T,? extends K> keySelector)

public final <K,V> Single<java.util.Map<K,V>> toMap(Function<? super T,? extends K> keySelector,
                                                        Function<? super T,? extends V> valueSelector)


public final <K,V> Single<java.util.Map<K,V>> toMap(Function<? super T,? extends K> keySelector,
                                                        Function<? super T,? extends V> valueSelector,
                                                        java.util.concurrent.Callable<? extends java.util.Map<K,V>> mapSupplier)
Copy the code

Error handling

onErrorResumeNext

If an exception occurs, a callback is received to return another observed (OnNext not onError)

Public Final Observable<T> onErrorResumeNext(ObservableSource<? extends T> next)Copy the code

onErrorReturn

A callback return event is received if an exception occurs (execute OnNext but not onError)

public final Observable<T> onErrorReturn(Function<? super java.lang.Throwable,? extends T> valueSupplier)
Copy the code

Both operators end up executing onComplete

Retry

A retry is performed after the observed emitter emits an exception event (onError)

Public Final Observable<T> Retry () // Resends events when an error occurs. Public Final Observable<T> Retry (long)timesPublic Final Observable<T> Retry (Predicate<? super java.lang.Throwable> predicate) public final Observable<T> retry(BiPredicate<? super java.lang.Integer,? Super java.lang.Throwable> predicate) public Final Observable<T> determines whether or not to make the call again. retry(longtimes, Predicate<? Super java.lang.Throwable> predicate) // Predicate function + retry timesCopy the code

RetryUntil

The Retry operator actually has the same implementation Retry (predicate).

The difference is that returning true stops the retry

public final Observable<T> retryUntil(BooleanSupplier stop)
Copy the code

RetryWhen

In the callback function returns an observed that causes the source to retry if it emits an error event. Retry is not triggered if no error event is emitted;

public final Observable<T> retryWhen(Function<? super Observable<java.lang.Throwable>,? extends ObservableSource<? >> handler)Copy the code

Example:

This example does not trigger retry;

Observable.create( new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onError(null); emitter.onNext(3); } }) .retryWhen(new Function<Observable<Throwable>, ObservableSource<? >>() { @Override public ObservableSource<? > apply(Observable<Throwable> throwableObservable) throws Exception {return Observable.just(23);
            }
        })
        .subscribe(
            new DefaultObserver<Integer>() {
              @Override
              public void onNext(Integer integer) {
                System.out.println("integer = [" + integer + "]");
              }

              @Override
              public void onError(Throwable e) {
                System.out.println("Main.onError");
              }

              @Override
              public void onComplete() {
                System.out.println("Main.onComplete"); }});Copy the code

Judge operation

Contains

To determine whether the emitted event contains the specified event, the observer gets a Boolean value

public final Single<java.lang.Boolean> contains(java.lang.Object element)
Copy the code

Any

public final Single<java.lang.Boolean> any(Predicate<? super T> predicate)
Copy the code

Judge each event in turn and terminate the emitter immediately if true is returned

    Observable.just(1.2.3.4).any(new Predicate<Integer>() {
      @Override public boolean test(Integer integer) throws Exception {
        Log.d("Log"."(MainActivity.java:27) ___ " + "test() called with: integer = [" + integer + "]");
        if (integer == 3) {
          return true;
        } else {
          return false;
        }
      }
    }).subscribe(new Consumer<Boolean>() {
      @Override public void accept(Boolean aBoolean) throws Exception {
        Log.d("Log"."(MainActivity.java:33) ___ " + "accept() called with: aBoolean = [" + aBoolean + "]"); }});Copy the code

IsEmpty

public final Single<java.lang.Boolean> isEmpty()
Copy the code

To determine whether an event was emitted, the observer gets a Boolean value.

DefaultIfEmpty

public final Observable<T> defaultIfEmpty(T defaultItem)
Copy the code

If the emitter emits no event, a specified default event is emitted, such as if the emitter’s event was intercepted

SwitchIfEmpty

public final Observable<T> switchIfEmpty(ObservableSource<? extends T> other)
Copy the code

If there is no emitted event it is replaced by another observer

Flowable.empty()
        .switchIfEmpty(Flowable.just(3, 4, 5))
        .subscribe(ele -> Log.i("tag", String.valueOf(ele)));
Copy the code

SequenceEqual

public static <T> Single<java.lang.Boolean> sequenceEqual(ObservableSource<? extends T> source1,
                                                          ObservableSource<? extends T> source2,
                                                          BiPredicate<? super T,? super T> isEqual)
/ / the last

public static <T> Single<java.lang.Boolean> sequenceEqual(ObservableSource<? extends T> source1,
                                                              ObservableSource<? extends T> source2)
Copy the code

Will compare whether the two observed are the same, and then the observer accepts a Boolean value, false for different types of emitted events. The event is terminated as soon as a difference is detected.

    Observable.sequenceEqual(Observable.intervalRange(0.3.0.1, TimeUnit.SECONDS),
        Observable.just(0l.1l.2l), new BiPredicate<Long, Long>() {
          @Override public boolean test(Long aLong, Long aLong2) throws Exception {
            // Check whether it is equal here
            return false;
          }
        }).subscribe(new Consumer<Boolean>() {
      @Override public void accept(Boolean aBoolean) throws Exception {
        // Final result}});Copy the code

merge

Reduce

The addition operator can handle two events at a time, and then all events are processed to be accepted by the observer as the final event

public final Maybe<T> reduce(BiFunction<T,T,T> reducer)
Copy the code

The first processing (apply) receives events 1 and 2, and the NTH is the result of the last processing (the value returned by Apply) and event N

Example:

Observable.just(1.2.3.4).reduce(new BiFunction<Integer, Integer, Integer>() {
  /** * This method will be called back several times until all the events have been added up (or acted on) before the observer receives the final result *@throws Exception
       */
  @Override public Integer apply(Integer integer, Integer integer2) throws Exception {
    // Three generics correspond to the current event return value of the last run result
    return integer + integer2;
  }
}).subscribe(new Consumer<Integer>() {
  @Override public void accept(Integer integer) throws Exception {}});Copy the code

Undetermined operator

public final <R> Single<R> reduce(R seed,
                                  BiFunction<R,? super T,R> reducer)

public final <R> Single<R> reduceWith(java.util.concurrent.Callable<R> seedSupplier,
                                      BiFunction<R,? super T,R> reducer)
Copy the code

Scan

Similar to Reduce, but each time the observer receives the return value of the callback parameter as an event

public final Observable<T> scan(BiFunction<T,T,T> accumulator)
    
public final <R> Observable<R> scanBiFunction<R,? BiFunction<R,?super T,R> accumulator)
Copy the code

ScanWith

public final <R> Observable<R> scanWith(java.util.concurrent.Callable<R> seedSupplier,
                                        BiFunction<R,? super T,R> accumulator)
Copy the code

Collect

You can create containers to manipulate the data in turn (the observer receives only one event, the container)

    Flowable.just(1.2.3)
        .collect(
            new Callable<ArrayList<Integer>>() {// Create a collection container
              @Override
              public ArrayList<Integer> call(a) throws Exception {
                return newArrayList<>(); }},new BiConsumer<ArrayList<Integer>, Integer>() {// Create a collection
              @Override
              public void accept(ArrayList<Integer> list, Integer integer)
                  throws Exception {// The former container, the latter data
                list.add(integer);
              }
            })
        .subscribe(ele -> Log.d("Log"."(MainActivity.java:33) ___ Result = " + String.valueOf(ele)));
Copy the code

time

Delay

The delay operator delays the sending of each event (including onComplete but not onError)

public final Observable<T> delay(long delay, TimeUnit unit) public final <U> Observable<T> delay(Function<? super T,? Extends ObservableSource<U>> itemDelay) // The callback parameter causes the delay to end if onNext is sentCopy the code

TimeStamp

This operator encapsulates both the event and the sent time into an object Timed

public final Observable<Timed<T>> timestamp()

public final Observable<Timed<T>> timestamp(Scheduler scheduler)

public final Observable<Timed<T>> timestamp(TimeUnit unit)

public final Observable<Timed<T>> timestamp(TimeUnit unit,
                                            Scheduler scheduler)
Copy the code

Example:

Observable.intervalRange(0.5.2.2, TimeUnit.SECONDS)
        .timestamp()
        .subscribe(new Consumer<Timed<Long>>() {
          @Override
          public void accept(Timed<Long> longTimed) throws Exception {
            Log.d("Log"."accept() called with: longTimed = [" + longTimed + "]"); }});Copy the code

The results of

longTimed = [Timed[time=1525735346216, unit=MILLISECONDS, value=2]]
Copy the code

Count

The observer will receive the number of events, not the events themselves.

Flowable.just(1.2.3.4.5).count().subscribe(new BiConsumer<Long, Throwable>() {
    @Override public void accept(Long aLong, Throwable throwable) throws Exception {
        Log.d("Log"."(MainActivity.java:18) ___ Result = "+ aLong); }});Copy the code

BackPressure

It means back pressure.

Knowing the difference between Observable and Flowable, we still don’t know what is called back pressure, so let’s briefly understand the concept. The so-called back pressure is the problem caused by the producer (observed) producing faster than the consumer (observer) consuming.

To take a simple example, if the observer sends a message quickly, but the observer processes the message slowly, if there is no specific Flow control, a large backlog of messages will occupy system resources, resulting in very slow.

Synchronous threads are unlikely to cause this problem, and the observer cannot send the event again until it has finished processing it.

How to optimize and reduce this situation will be discussed later, but it can be noted that BackpressureStrategy has been set when Flowable is created, and Subscriber uses Request to control the maximum flow.

The observed

Hot Observable

Hot Observer: Starts sending events without subscribing. Subscriptions receive only events sent after the subscription.

The replay

  • thread
  • time
  • Unit of time
  • The cache size
public final ConnectableObservable<T> replay()
Copy the code

ConnectableObservable

The ConnectableObservable does not receive the event until it uses connect

public final Disposable connect(a)

public abstract void connect(Consumer<? super Disposable> connection)
Copy the code

Automatically connects, but does not disconnect upstream events when all observers unsubscribe

public Observable<T> autoConnect(a)

public Observable<T> autoConnect(int numberOfSubscribers) // Automatically connect when N observers subscribe

public Observable<T> autoConnect(int numberOfSubscribers,
                        Consumer<? super Disposable> connection) Dispose object is available in this callback to cancel upstream
Copy the code

This function connects when the first observer subscribes and ends the upstream event delivery when all observers unsubscribe.

public Observable<T> refCount(a)

public final Observable<T> share(a) // Equivalent to publish and refCount
Copy the code

Observable

High performance when the observed does not support back pressure

ObservableSource

Flowable

The performance of the observed that supports back pressure is slightly lower than that of An Observable. The function system is different from that of an Observable.

Publisher

Publisher Belongs to the root interface of the Flowable and ObservableSource belongs to the root interface of the Observable.

Single

The observed can only send one event, and repeated sending is not affected (there is no back pressure because only one event can be sent).

Single.create(object : SingleOnSubscribe<Int> {
    override fun subscribe(emitter: SingleEmitter<Int>) {
        emitter.onSuccess(12)
    }
}).subscribe(object : SingleObserver<Int? > {override fun onSuccess(t: Int){}override fun onSubscribe(d: Disposable){}override fun onError(e: Throwable){}})Copy the code

DisposableSingleObserver

You can manually disconnect the observer

public final void dispose()

public final boolean isDisposed()
Copy the code

ResourceSingleObserver

You can add other Disposable and then cancel the observer altogether

public final void add(Disposable resource)
Copy the code

All the other observers basically follow these rules and there are observer classes with similar names

Completable

If your observer doesn’t even care about the onNext event, you can use Completable, which only has onComplete and onError events:

Completable.create(new CompletableOnSubscribe() {// @override public void subscribe(CompletableE) throws Exception {e.on complete (); // single onComplete or onError}}).subscribe(newCompletableObserver() {// observer @override public void onSubscribe(Disposable d) {} @override public voidonComplete() {

    }

    @Override
    public void onError(Throwable e) {

    }
});

Copy the code

It is also possible to use Actions to simplify the Observer:

  • completable.subscribe(onComplete)
  • completable.subscribe(onComplete,onError)

ToFlowable (), toObservable() and other methods can be used to convert to other types of observed.

Maybe

If you have a requirement that it is possible to send data or not send data at all, then you need Maybe, which is like a cross between Single and Completable.

Maybe may invoke one of the following:

  • OnSuccess or onError
  • The onComplete or onError

As you can see, onSuccess and onComplete are mutually exclusive.

// Observed
Maybe<String> maybe = Maybe.create(new MaybeOnSubscribe<String>() {
    @Override
    public void subscribe(MaybeEmitter<String> e) throws Exception {
        e.onSuccess("test");// Send a data case, or onError, no need to call onComplete(it will not trigger the onComplete callback)
        //e.onComplete(); // If no data needs to be sent, or onError}});// Subscribe to observer
maybe.subscribe(new MaybeObserver<String>() {
    @Override
    public void onSubscribe(Disposable d) {}@Override
    public void onSuccess(String s) {
        // Sending a data is equivalent to onNext and onComplete, but does not trigger another method, onComplete
        Log.i("tag", s);
    }

    @Override
    public void onComplete(a) {
        // onComplete event when no data is sent
        Log.i("tag"."onComplete");
    }

    @Override
    public void onError(Throwable e) {}});Copy the code

Subject

It inherits both Observable and Observer

AsyncSubject

The observed will only send the last event (while onComplete)

Require that onComplete() be executed manually to receive the last event and onComplete, before or after the subscription.

Terminate any event if an exception occurs

BehaviorSubject

You can only observe the last event before the subscription and all events after the subscription

PublishSubject

Watch only events sent after the subscription;

ReplaySubject

Events are observed before and after the subscription

UnicastSubject

Allow the observed to subscribe only once, otherwise throw an exception.

SerializedSubject

Used for multithreaded launch events. Just wrap the observed.

Thread safety

If multiple threads emit events, you need to use the following function transformation to queue up and wait for the emitted events in sequence.

public final Subject<T> toSerialized()
Copy the code

SerializedSubject is no longer allowed to be created directly

The observer

  • Action: No parameter type

  • Consumer: Single parameter type

  • BiConsumer

    : two-parameter type
    ,>

  • Consumer

    : Multi-parameter type
    []>

test

The event class used to test RxJava

TestObserver

TestScheduler

For some event observers such as round-robin that takes a certain amount of time

Public void advanceTimeBy(long delayTime, Java. Util. Concurrent. TimeUnit unit) / / time direct to the point of a public void advanceTimeTo (long delayTime, java.util.concurrent.TimeUnit unit)Copy the code

TestSubsriber

    @Test public void addition_isCorrect(a) throws Exception {
        TestSubscriber<String> testSubscriber = new TestSubscriber<>();
        // Launch A, B, C in sequence
        Flowable.just("A"."B"."C").subscribe(testSubscriber);

        // Assert whether the value does not exist
        testSubscriber.assertNever("D");
        // Assert whether the values are equal
        testSubscriber.assertValues("A"."B"."C");
        // Assert whether the number of values is equal
        testSubscriber.assertValueCount(3);
        // Whether the assertion ends
        testSubscriber.assertTerminated();
    }
Copy the code