First of all:
- RXJava recommends mastering Lambda first, or defining a lot of generics. And the amount of code redundancy.
- RxJava is divided into three parts,
Observable
Observed sumObserver
Observers and operators. - Reactive streaming programming is an architecture developed by Microsoft and implemented by Netfix.
- RxJava is for The Java language, Android and JavaEE can be used, but currently mainly used by Android.
- Operators (or methods) have many variations (or overloading of methods)
Advantages:
- Asynchronous processing for multi-threaded tasks is very powerful and convenient
- RxJava already provides some functionality that is complex to implement by itself
I usually project development necessary framework
- The strongest network request Net on Android
- The strongest list on Android (including StateLayout) BRV
- Android’s strongest default page StateLayout
- JSON and long text log printing tool LogCat
- Supports asynchronous and global customization of the toast tool Tooltip
- Develop the debugging window tool DebugKit
- One line of code creates the transparent StatusBar StatusBar
Official website:
-
ReactiveX
ReactiveX supports many languages, and RxJava is just one of them (RxPHP, RxSwift, RxKotlin, for example).Copy the code
-
RxJava
-
RxKotlin
An extension for Android
-
RxAndroid
The main schedulers in Android are added.Copy the code
-
RxBinding
The framework is more cumbersome and bloated, it is recommended to achieve their own
-
AutoDispose
My open source project automatically unsubscribs to the observer following the lifecycle. And you can set the life cycle. Simpler than Uber's open source project, AutoDisposeCopy the code
-
RxBus
README supports three types of event-tag-event-tag + eventCopy the code
-
Net
My open source project, fully automatic network request framework, based on the Kalle(Nohttp2.0 version) author Yan Zhenjie's library for re-packaging, adding RxJava/Kotlin and other new features. Specific see the READMECopy the code
The document
RxJavaDoc
RxJava Chinese translation document
The basic use
The scheduler
public final Observable<T> subscribeOn(Scheduler scheduler)
public final Observable<T> observeOn(Scheduler scheduler)
Copy the code
AndroidScheduler is a scheduler added to RxAndroid that mainly adds the main thread.
- By default, the observer (operator) follows the observed thread on which the instance was created.
- SubscribeOn determines the threads of the observed (only the threads specified for the first time are valid). Some of the observed threads cannot be changed if they have a default thread (e.g. Interval defaults to computation thread).
- ObserveOn determines the following observer threads (including operators) (which can be changed multiple times), including the execution thread of operator callbacks without a default scheduler below observerOn;
The operator
- Scheduler operators that cannot be specified to follow the observed thread can pass
subscribeOn
To control the thread from which the observed emits the event - Operators that have a default thread scheduler cannot be controlled by subscribeOn (but there are usually overloaded methods that can be controlled in parameters)
Scheduler type
Schedulers.computation( ) is used for computing tasks such as event loops or callback processing, Schedulers. IO ( ) Is used for IO intensive tasks, such as asynchronous blocking I/O operations. The scheduler's thread pool grows as needed; For ordinary computing tasks, Use schedulers.newthread ( ) to create a newThread for each task schedulers.trampoline ( ) to execute the task schedulers.single () all Schedulers that use this scheduler are always in the same thread, Tasks are executed in first-in, first-out order.Copy the code
Unsubscribing in batches
CompositeDisposable
Example:
CompositeDisposable compositeDisposable=new CompositeDisposable();
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1); emitter.onComplete(); Or emitter. OnError (new Throwable("O__O "));
}
}).subscribe(new Observer<Integer>() {
private Disposable mDisposable;
@Override
public void onSubscribe(Disposable d) {
/ / subscribe
mDisposable = d;
// Add to the container
compositeDisposable.add(d);
}
@Override
public void onNext(Integer value) {
// Determine mDisposable. IsDisposed () If disposed, no need to dispose
}
@Override
public void onError(Throwable e) {}@Override
public void onComplete(a) {}});// Unsubscribe all subscribers
compositeDisposable.clear();
Copy the code
It’s important to note that an Observable sends events only when it has a subscriber. If the subscriber is canceled, it doesn’t send any events, so you don’t have to worry about memory leaks.
Periodic function
DoOn **() is a series of methods that can be performed before the observer callback
public final Flowable<T> doOnDispose(Action onCancel)
public final Flowable<T> doOnComplete(Action onComplete)
public final Flowable<T> doOnEach(Consumer<? super Notification<T>> onNotification)
// Sending any event triggers a callback (including onError onComplete)
public final Flowable<T> doOnEach(org.reactivestreams.Subscriber<? super T> subscriber)
public final Flowable<T> doOnError(Consumer<? super java.lang.Throwable> onError)
public final Flowable<T> doOnLifecycle(Consumer<? super org.reactivestreams.Subscription> onSubscribe,
LongConsumer onRequest,
Action onCancel)
public final Flowable<T> doOnNext(Consumer<? super T> onNext)
public final Flowable<T> doOnRequest(LongConsumer onRequest)
// This method is used to track back pressure and is often used for debugging, so Observable doesn't have this method
public final Flowable<T> doOnSubscribe(Consumer<? super org.reactivestreams.Subscription> onSubscribe)
// Callback before subscribing to observed
public final Flowable<T> doOnTerminate(Action onTerminate)
// This callback is called before the onComplete and onError methods, either exception or completion
Copy the code
Notification
Because the doOnEach callback fires on all events, the Notification contains information about all events
java.lang.Throwable getError(a)
// An exception message is returned if the event is onError, otherwise null
T getValue(a)
// This value is returned if the event is onNext, otherwise null is returned
boolean isOnComplete(a)
boolean isOnError(a)
boolean isOnNext(a)
// Notification provides static methods that directly constitute instance objects of three kinds of events
static <T> Notification<T> createOnComplete(a)
static <T> Notification<T> createOnError(java.lang.Throwable error)
static <T> Notification<T> createOnNext(T value)
Copy the code
Lifecycle interception
Lift
Observable.just(1).lift(ObservableOperator<String, Int> {
object :DisposableObserver<Int> () {override fun onComplete(a){}override fun onNext(t: Int) {
Log.d("Log"."(mainActivity.kt :43):onNext intercept t =$t")}override fun onError(e: Throwable) {
}
}
}).subscribe(object: Observer<String? > {override fun onComplete(a){}override fun onSubscribe(d: Disposable){}override fun onNext(t: String) {
Log.d("Log"."(mainActivity.kt :59):onNext final t =$t")}override fun onError(e: Throwable){}})Copy the code
Compose
public final <R> Observable<R> compose(ObservableTransformer<? super T,? extends R> composer)
Copy the code
Compose is a stream conversion operator, unlike FlatMap, which is a data conversion operator, which is composed for the entire stream.
Flatmap executes as many times as Compose executes.
Observable.just(1).compose(object : ObservableTransformer<Int, String> {
override fun apply(upstream: Observable<Int>): ObservableSource<String> {
}
})
Copy the code
###AS
Observable.just(1). `as` (object : ObservableConverter<Int, ObservableSource<String>> {
override fun apply(upstream: Observable<Int>): ObservableSource<String> {
}
})
Copy the code
- Compose is a data flow transformation, and AS can be a pure data transformation.
Create observed
Create
Used to create custom Observables
public static <T> Observable<T> create(ObservableOnSubscribe<T> source)
Copy the code
There is only one method in ObservableOnSubscribe
void subscribe(ObservableEmitter<T> e)
throws java.lang.Exception
Copy the code
ObservableEmitter. There are three ways to send events
void onComplete(a)
void onError(java.lang.Throwable error)
void onNext(T value)
Copy the code
Tip:
onError()
andonComplete()
Cannot be used simultaneouslyonError()
Can’t be used more than once,onComplete()
can- Check whether the observer has unsubscribed before launching events or exceptions. Otherwise, exceptions will be thrown
Other methods:
void setCancellable(Cancellable c)
// Set a cancel event listener
void setDisposable(Disposable d)
boolean isDisposed(a)
Copy the code
Just
Batch events by passing N parameters directly (up to nine parameters)
static <T> Observable<T> just(T item)
Copy the code
The onComplete method is called back after all events are sent
FromArray
Send events by passing in an array or collection
static <T> Observable<T> fromArray(T... items)
Copy the code
FromIterable
Iterable is the root interface for Iterable collections, which can be sent by sending collections;
List<String> list = new ArrayList<>();
list.add("a");
list.add("b");
list.add("c");
Flowable.fromIterable(list).subscribe(
s -> Log.i("tag", s)
);
Copy the code
More ways
public static <T> Observable<T> fromArray(T... items)
public static <T> Observable<T> fromCallable(java.util.concurrent.Callable<? extends T> supplier)
public static <T> Observable<T> fromFuture(java.util.concurrent.Future<? extends T> future)
public static <T> Observable<T> fromFuture(java.util.concurrent.Future<? extends T> future,
long timeout,
java.util.concurrent.TimeUnit unit)
Copy the code
Defer
The observed can only be created in the callback function
public static <T> Observable<T> defer(java.util.concurrent.Callable<? extends ObservableSource<? extends T>> supplier)
Copy the code
StartWith
Add events before those already created
public final Observable<T> startWith(T item)
public final Observable<T> startWithArray(T... items)
Copy the code
End stream immediately
Empty
Call onComplete() immediately without sending any events to the observer
Flowable.empty().subscribe(
obj -> Log.i("tag"."next" + obj.toString()),
e -> Log.i("tag"."error"),
() -> Log.i("tag"."complete"));
Copy the code
Never
Do not send any events to the observer and do not execute any methods (such as OnComplete)
Error
Does not send any events, but executes onError() directly
Round robin device
Interval
Timing control sends events at intervals. The default is ComputationScheduler, where threads can be specified.
You can only control the interval
public static Observable<java.lang.Long> interval(long period,
java.util.concurrent.TimeUnit unit)
public static Observable<java.lang.Long> interval(long initialDelay, // First delay time
long period, // Interval time
java.util.concurrent.TimeUnit unit) // Time unit
Copy the code
IntervalRange
Timer events are sent to a more precise range, defaulting to the ComputationScheduler, which can be specified.
You can control the number of times you send it
public static Observable<java.lang.Long> intervalRange(long start, // Start the number
long count, // Total number of times
long initialDelay, // Initial interval
long period, // After each interval
java.util.concurrent.TimeUnit unit) // Time unit
Copy the code
Timer
Equal to the interval
- Specify interval time
- Unit of time
- Specifying a thread (computationScheduler)
public static Observable<java.lang.Long> timer(long delay, // Interval time
java.util.concurrent.TimeUnit unit) // Time unit
Copy the code
TimerInterval
Record the information of the round robin
- The time interval
- Unit of time
- value
- The specified thread
public final Observable<Timed<T>> timeInterval()
// Set the interval to a fixed unit
public final Observable<Timed<T>> timeInterval(TimeUnit unit)
Copy the code
Example output:
Timed[time=1003, unit=MILLISECONDS, value=12] Timed[time=1, unit=SECONDS, value=40]Copy the code
Range
The observer receives a range of numbers that are not executed by default in a particular thread.
- Began to digital
- Digital range
// Send an int event type
public static Observable<java.lang.Integer> range(int start, / /
int count) / / end
// Send the event type of long
public static Observable<java.lang.Long> rangeLong(long start,
long count)
Copy the code
Repeat
Repeat send event
- An infinite loop
- cycles
- End cycle time
- End cycle condition
public final Observable<T> repeat(a)
// Infinite loop
public final Observable<T> repeat(long times)
// Set the number of loops
public final Observable<T> repeatUntil(BooleanSupplier stop)
// Set the end of loop condition
public final Observable<T> repeatWhen(Function<? superObservable<java.lang.Object>, ? extends ObservableSource<? >> handler)
// Add an observed as a condition for resending the event
Copy the code
The sample
io.reactivex.Observable.just(1)
.repeatUntil(
new BooleanSupplier() {/** * @returnreturntrue* @throws Exception */ @override public Boolean getAsBoolean() throws Exception {return true;
}
})
.subscribe(System.out::println);
Copy the code
RepeatWhen
If the observed in the callback sends onComplete and onError events it does not enter a repeat event
However, sending the onNext event will result in repeated sending
Observable.intervalRange(0, 5, 1, 1, TimeUnit.SECONDS, TrampolineScheduler.instance()) .repeatWhen( new Function<Observable<Object>, ObservableSource<?>>() { @Override public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {// Wait five seconds after the source is completed (onComplete) and resend againreturn Observable.interval(5, TimeUnit.SECONDS);
}
})
.subscribe(
new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
// do something
}
});
Copy the code
Multi-observer creation
Concat
Connect multiple observers and send them in order
The ConcatArray and Concat operators are the same, except that they accept arrays
Merge
Merge multiple observed objects in chronological order (not in parameter addition order)
public static <T> Observable<T> mergeArray(int maxConcurrency,
int bufferSize,
ObservableSource<? extends T>... sources)
Copy the code
Zip
Events sent by multiple emitters can be combined into one in the corresponding order and then received at a time, following the principle of pairwise merging.
If there is asynchrony, it waits for the two events to be merged to execute simultaneously before sending them to the observer.
- Follow the subscription order
public static <T,R> Observable<R> zip(ObservableSource<? extends ObservableSource<? extends T>> sources,
Function<? super java.lang.Object[],? extends R> zipper)
Copy the code
Example:
Observable.zip(getStringObservable(), getIntegerObservable(),
new BiFunction<String, Integer, String>() {
@Override public String apply(@NonNull String s, @NonNull Integer integer)
throws Exception {
// Merge the events of the two emitters here and send them uniformly
return s + integer;
}
}).subscribe(new Consumer<String>() {
@Override public void accept(@NonNull String s) throws Exception {
// Only apply returns will be accepted}});Copy the code
ZipWith
Combine (Zip) the incoming and source observed. This method is non-static
Observable.just(1).zipWith(Observable.just(2), new BiFunction<Integer, Integer, String>() {
@Override
public String apply(Integer integer, Integer integer2) throws Exception {
System.out.println("integer = [" + integer + "], integer2 = [" + integer2 + "]");
return integer + "" + integer2;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
// Result: s = [12]
System.out.println("s = [" + s + "]"); }});Copy the code
CombineLast
All events of the last observed are merged in turn with the last events of the previous observed, and the observer receives the merged events in turn
- Follow the subscription observer order
public static <T1,T2,T3,R> Observable<R> combineLatest(ObservableSource<? extends T1> source1,
ObservableSource<? extends T2> source2,
Function3<? super T1,? super T2,? super T3,? extends R> combiner)
Copy the code
Example results:
The last event of the last observed and all previous observed is received at the same time
S = [round robin 1 10 round robin 2 10 round robin 3 1] s = [round robin 1 10 round robin 2 10 round robin 3 2] s = [round robin 1 10 round robin 2 10 round robin 3 3] s = [round robin 1 10 round robin 2 10 round robin 3 3] S = [round robin 1 10 round robin 2 10 round robin 3 4] S = [round robin 1 10 round robin 2 10 round robin 3 5] S = [round-robin 1 10 round-robin 2 10 round-robin 3 6] s = [round-robin 1 10 round-robin 2 10 round-robin 3 7] S = [round-robin 1 10 round-robin 2 10 round-robin 3 8] S = [round-robin 1 10 round-robin 2 10 round-robin 3 9] S = [round-robin 1 10 round-robin 2 10 round-robin 3 10]Copy the code
This function is sent to the observer by combining all events belonging to the first observer with the latest events of all subsequent observers
public final <R> Observable<R> withLatestFrom(ObservableSource<? >[] others, Function<?super java.lang.Object[],R> combiner)
Copy the code
Change operator
FlatMap
Intercepting the event and then transferring it to the observed again
Staggered ordering, which means that the emitter may send events asynchronously or latently.
public final <R> Observable<R> flatMap(Function<? super T,? extends ObservableSource<? Extends R > > mapper) / / onNext/onError/onComplete callback public final < R > observables < R > flatMap (Function <? super T,? extends ObservableSource<? extends R>> onNextMapper, Function<? super java.lang.Throwable,? extends ObservableSource<? extends R>> onErrorMapper, java.util.concurrent.Callable<? extends ObservableSource<? extends R>> onCompleteSupplier) public final <U,R> Observable<R> flatMap(Function<? super T,? extends ObservableSource<? extends U>> mapper, BiFunction<? super T,? super U,? extends R> resultSelector)Copy the code
FlatMapIterable
public final <U> Observable<U> flatMapIterable(Function<? super T,? extends java.lang.Iterable<? extends U>> mapper)
// The collection type is received by the observer
Copy the code
Overloading methods eventually convert an iterable back into a single object
public final <U,V> Observable<V> flatMapIterable(
Function<? super T,? extends java.lang.Iterable<? extends U>> mapper,
BiFunction<? super T,? super U,? extends V> resultSelector)
Copy the code
The sample
Observable.just(1.2.3.4.5).flatMapIterable(new Function<Integer, Iterable<String>>() {
@Override public Iterable<String> apply(Integer integer) throws Exception {
Log.d("Log"."(MainActivity.java:32) ___ " + "apply() called with: integer = [" + integer + "]");
// Iterable belongs to the root interface of all collections
ArrayList<String> strings = new ArrayList<>();
strings.add(integer.toString() + "In the set");
returnstrings; }},new BiFunction<Integer, Object, String>() {
/** * gets an event that is finally accepted by the observer@paramT1 emitter events *@paramT2 is added to the collection object *@returnThe event that is ultimately accepted by the observer *@throwsException Throws an Exception */ if null is returned
@Override public String apply(Integer integer, Object o) throws Exception {
Log.d("Log"."(MainActivity.java:39) ___ "
+ "apply() called with: integer = ["
+ integer
+ "], o = ["
+ o
+ "]");
// If null is returned, an exception is thrown into onError
return Daniel Wu;
}
}).subscribe(new Observer<String>() {
@Override public void onSubscribe(Disposable d) {
Log.i("Log"."(MainActivity.java:49) ___ onSubscribe");
}
@Override public void onNext(String s) {
Log.d("Log"."(MainActivity.java:53) ___ " + "onNext() called with: s = [" + s + "]");
}
@Override public void onError(Throwable e) {
Log.i("Log"."(MainActivity.java:57) ___ onError");
}
@Override public void onComplete(a) {
Log.i("Log"."(MainActivity.java:61) ___ onComplete"); }});Copy the code
FatMapCompletable
Only onComplete callbacks are accepted
public final Completable flatMapCompletable(Function<? super T,? extends CompletableSource> mapper)
public final Completable flatMapCompletable(Function<? super T,? extends CompletableSource> mapper,
boolean delayErrors)
Copy the code
FlatMapMaybe
public final <R> Observable<R> flatMapMaybe(Function<? super T,? extends MaybeSource<? extends R>> mapper);
public final <R> Observable<R> flatMapMaybe(Function<? super T,? extends MaybeSource<? extends R>> mapper,
boolean delayErrors)
Copy the code
FlatMapSingle
Single receiver
public final <R> Observable<R> flatMapSingle(Function<? super T,? extends SingleSource<? extends R>> mapper)
public final <R> Observable<R> flatMapSingle(Function<? super T,? extends SingleSource<? extends R>> mapper,
boolean delayErrors)
Copy the code
ConcatMap
The difference between FlatMap and FlatMap is that sequential emission is guaranteed (there is no staggered sequence) and Concat is used internally.
For example: will two asynchronous observers send events in the order you added the parameters anyway
public final <R> Flowable<R> concatMap(Function<? super T,? extends org.reactivestreams.Publisher<? extends R>> mapper)
public final <R> Flowable<R> concatMap(Function<? super T,? extends org.reactivestreams.Publisher<? extends R>> mapper,
int prefetch)
Copy the code
ConcatMapDelayError
Delayed exceptions are thrown until all events have been sent
public final <R> Flowable<R> concatMapDelayError(Function<? super T,? extends org.reactivestreams.Publisher<? extends R>> mapper)
public final <R> Flowable<R> concatMapDelayError(Function<? super T,? extends org.reactivestreams.Publisher<? extends R>> mapper,
int prefetch,
boolean tillTheEnd)
Copy the code
ConcatMapEager
Add all observed events to one observed and send them all at once;
public final <R> Observable<R> concatMapEager(Function<? super T,? extends ObservableSource<? extends R>> mapper)
Copy the code
Events changes
Map
Type conversion
public final <R> Flowable<R> map(Function<? super T,? extends R> mapper)
Copy the code
SwitchMap
Each time the source observer sends data, a new observer is sent to the observer, but if there is a delay operation, only the new observer created by the last source observer is sent.
Before RxJava2 SwitchMap was called FlatMapLatest, if this is easier to understand
public final <R> Observable<R> switchMap(Function<? super T,? extends ObservableSource<? extends R>> mapper)
Copy the code
The sample
Observable.just(1.2.3)
.switchMap(newFunction<Integer, ObservableSource<? > > () {@Override
public ObservableSource<Long> apply(Integer integer) throws Exception {
return interval1;
}
})
.subscribe(ele -> Log.d("Log"."(MainActivity.java:39) ___ Result = " + ele));
Copy the code
Intercept events
Buffer
Create a List collection to store events to send to observers
- The number of
- span
- time
public final Observable<java.util.List<T>> buffer(int count) // Add the number of events to the List
public final Observable<java.util.List<T>> buffer(int count,
int skip) // Segment event span
Copy the code
For example, {1.2.3.4.5} count = 3, skip = 2.
The event received is {1,2,3} {3,4,5} {5}
public final <B> Observable<java.util.List<T>> buffer(java.util.concurrent.Callable<? extends ObservableSource<B>> boundarySupplier)
// Encapsulates a boundarySupplier into a Collection on the callback interface
// Cache events for a certain amount of time to add to the collection
public final Observable<java.util.List<T>> buffer(long timespan,
long timeskip,
java.util.concurrent.TimeUnit unit)
Copy the code
Window
Creates an observer-stored event that is sent to the observer. The difference with a Buffer is that instead of a collection, the observer is returned, which contains events within the span.
Points in time
public final Observable<Observable<T>> window(long timespan,
java.util.concurrent.TimeUnit unit)
// Group by time span
public final Observable<Observable<T>> window(long timespan,
java.util.concurrent.TimeUnit unit,
long count)
// Group by the number of events within the span of time (only a specified number of events can be added to the observed)
public final Observable<Observable<T>> window(long timespan,
long timeskip,
java.util.concurrent.TimeUnit unit)
// Group by time span
public final Observable<Observable<T>> window(long timespan,
java.util.concurrent.TimeUnit unit,
long count,
boolean restart)
Copy the code
Observer by observer transmission
public final Observable<Observable<T>> window(long count)
// Group by quantity
public final Observable<Observable<T>> window(long count,
long skip)
// Group by span
Copy the code
The sample
Observable.just(1.2.3.4).window(3).subscribe(new Consumer<Observable<Integer>>() {
@Override public void accept(Observable<Integer> integerObservable) throws Exception {
Log.i("Log"."(mainactivity.java :19) ___ observer");
integerObservable.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
Log.i("Log"."(mainactivity.java :23) ___ received an event"); }}); }});Copy the code
public final <B> Observable<Observable<T>> window(Callable<? extends ObservableSource<B>> boundary)
// Each time the parameter observer sends an event, it causes interception
public final <U,V> Observable<Observable<T>> window(ObservableSource<U> openingIndicator,
Function<? super U,? extends ObservableSource<V>> closingIndicator)
public final <U,V> Observable<Observable<T>> window(ObservableSource<U> openingIndicator,
Function<? super U,? extends ObservableSource<V>> closingIndicator,
int bufferSize)
// openingIndicator sends events to start interception, closingIndicator sends events to stop interception and sends segments to the observer
Copy the code
GroupBy
Observable changes Observable’s event group to GroupedObservable based on the key. This class inherits from Observable, but adds a new method
GetKey () gets the key of the object returned in the callback;
// Each event sent returns a key in the parameter callback, which is grouped if the keys are equal
public final <K> Observable<GroupedObservable<K,T>> groupBy(Function<? super T,? extends K> keySelector)
// Group and send the event again
public final <K,V> Observable<GroupedObservable<K,V>> groupBy(Function<? super T,? extends K> keySelector,
Function<? super T,? extends V> valueSelector)
Copy the code
ToList
Convert an event into a collection once accepted; However, the main requirement is to unify the event type of the observed
public final Single<java.util.List<T>> toList()
public final Single<java.util.List<T>> toList(int capacityHint)
// Set initializes space
public final <U extends java.util.Collection<? super T>> Single<U> toList(java.util.concurrent.Callable<U> collectionSupplier)
Copy the code
Remove duplicate events
Distinct
Duplicate the same event
public final Flowable<T> distinct(a)
// Remove all duplicate events
public final <K> Observable<T> distinct(Function<? super T,K> keySelector)
// Return a generic value in the callback and compare the generic value to see if it is a duplicate
public final <K> Observable<T> distinct(Function<? super T,K> keySelector,
java.util.concurrent.Callable<? extends java.util.Collection<? super K>> collectionSupplier)
Copy the code
Example:
/* demo takes only two even numbers */
Observable.just(1.2.3.4.5.6).distinct(new Function<Integer, String>() {
/** * This method calls back every time it sends an event@throws Exception
*/
@Override public String apply(Integer integer) throws Exception {
return integer % 2= =0 ? "Even" : "Odd";
}
}).subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
Log.d("Log"."(MainActivity.java:34) ___ " + "accept() called with: integer = [" + integer + "]"); }});Copy the code
DistinctUntilChanged
Remove adjacent duplicate events
public final Flowable<T> distinctUntilChanged(a)
// Only adjacent duplicate events are removed
public final Observable<T> distinctUntilChanged(BiPredicate<? super T,? super T> comparer)
// This callback will return two adjacent events at a time, and then you can compare the two values in the callback to see if they are repeated, and return a Boolean type
public final <K> Observable<T> distinctUntilChanged(Function<? super T,K> keySelector)
// Return a generic value in the callback and compare the generic value to see if it is a duplicate
Copy the code
Example:
Observable.just(1.2.2.4.5.6).distinctUntilChanged(new BiPredicate<Integer, Integer>() {
@Override public boolean test(Integer integer, Integer integer2) throws Exception {
return integer.equals(integer2);
}
}).subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
Log.d("Log"."(MainActivity.java:34) ___ " + "accept() called with: integer = [" + integer + "]"); }});Copy the code
Only specified data is sent
Element
Only events at the specified location (index) are sent
public final Maybe<T> elementAt(long index) / / index
public final Single<T> elementAt(long index,
T defaultItem) // Index out of bounds sends events
public final Maybe<T> firstElement(a)
// Send only the first one
public final Maybe<T> lastElement(a)
public final Completable ignoreElements(a)
// Ignore all events
public final Single<T> elementAtOrError(long index)
// If the event is empty, an exception is thrown
Copy the code
Debounce
Events that are sent again within a certain amount of time after the event is sent are discarded and the timer restarts.
This scenario is commonly used in the search input box automatic prompt: you continuously input text will send events, that is, the time is constantly reset to never successfully send events. The event will only be sent successfully if you stop typing.
public final <U> Observable<T> debounce(Function<? super T,? extends ObservableSource<U>> debounceSelector)
public final Observable<T> debounce(long timeout,
java.util.concurrent.TimeUnit unit)
Copy the code
There’s another way to do it
public final Observable<T> throttleWithTimeout(long timeout,
public final Observable<T> throttleWithTimeout(long timeout,
java.util.concurrent.TimeUnit unit,
Scheduler scheduler)
Copy the code
Filter
Filter events, such as not sending events if the string is empty
Observable.just(1.20.65, -5.7.19)
.filter(new Predicate<Integer>() {
@Override
public boolean test(@NonNull Integer integer) throws Exception {
// Determine whether to intercept the event based on the Boolean type of the returned result
return integer >= 10;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {}});Copy the code
All
The callback function is executed multiple times to determine whether all events meet the criteria but the observer receives only one Boolean value;
public final Single<java.lang.Boolean> all(Predicate<? super T> predicate)
Copy the code
Flowable.just(1.2.3.4).all(new Predicate<Integer>() {
@Override public boolean test(Integer integer) throws Exception {
return integer < 4;
}
}).subscribe(new Consumer<Boolean>() {
@Override public void accept(Boolean aBoolean) throws Exception {}});Copy the code
First
Fires only the first event, or the specified parameter if no event is fired directly onComplete().
The return value is Single and the observed object does not have an onComplete function.
public final Single<T> first(T defaultItem)
public final Single<T> firstOrError(a)
// If only onComplete will emit onError
Copy the code
Last
Only the last event is fired, and if no event is fired directly onComplete(), the specified parameter is fired.
public final Single<T> last(T defaultItem)
Copy the code
Amb
Only the observed that processes the first emitted event, all other observers are discarded.
It is required that the added event types of all observed events be unified;
public static <T> Observable<T> amb(java.lang.Iterable<? extends ObservableSource<? extends T>> sources)
// Set type
Copy the code
ambArray
The only difference is that the parameters become variable parameters
public static <T> Observable<T> ambArray(ObservableSource<? extends T>... sources)
// Variable parameter type
Copy the code
AmbWith
A nonstatic function that ends the subscription to the source observer and then the subscription to the current specified observer, similar to switching an observer.
public final Observable<T> ambWith(ObservableSource<? extends T> other)
Copy the code
Blocking
This operator breaks RxJava’s chain-call direct events
Send the first or only | the last event
public final T blockingFirst()
public final T blockingFirst(T defaultItem)
public final T blockingLast()
public final T blockingLast(T defaultItem)
public final T blockingSingle()
Copy the code
- If the observed event is null, an exception is thrown if the default value is not set
- Single is used, but an exception is thrown if observable has more than one event
Example:
Long aLong = Observable.intervalRange(0, 5, 1, 1, TimeUnit.SECONDS)
.blockingFirst();
Copy the code
cycle
public final void blockingForEach(Consumer<? super T> onNext) public final java.lang.Iterable<T> blockingIterable() public final java.lang.Iterable<T> blockingIterable(int bufferSize) public final java.lang.Iterable<T> blockingMostRecent(T initialValue) public final void blockingSubscribe()Copy the code
Restricted acceptance event
- The first one in the specified time
- The last one within the specified time
ThrottleFirst
Only the first event in a certain amount of time is sent, which is executed on the ComputationScheduler by default, but the Scheduler can be specified
public final Observable<T> throttleFirst(long windowDuration,
java.util.concurrent.TimeUnit unit)
public final Observable<T> throttleFirst(long skipDuration,
java.util.concurrent.TimeUnit unit,
Scheduler scheduler)
Copy the code
ThrottleLast
Only the last event within a certain period of time is sent
public final Observable<T> throttleLast(long intervalDuration,
java.util.concurrent.TimeUnit unit)
public final Observable<T> throttleLast(long intervalDuration,
java.util.concurrent.TimeUnit unit,
Scheduler scheduler)
Copy the code
ThrottleWithTimeout
After sending event A, the timing will be triggered. If A new event B is sent within the specified time, event A will be discarded. It has the same functionality as debounce, but debounce can add an Observable as a timer;
It can be interpreted as accepting only the last event in a certain amount of time
public final Observable<T> throttleWithTimeout(long timeout,
java.util.concurrent.TimeUnit unit)
public final Observable<T> throttleWithTimeout(long timeout,
java.util.concurrent.TimeUnit unit,
Scheduler scheduler)
Copy the code
Sample
Control interval, within a certain period of time only to take the last event emitted, can specify the thread. Equivalent throttleLast
public final Observable<T> sample(long period,
java.util.concurrent.TimeUnit unit)
public final Observable<T> sample(long period,
java.util.concurrent.TimeUnit unit,
boolean emitLast)
Copy the code
public final <U> Observable<T> sample(ObservableSource<U> sampler)
public final <U> Observable<T> sample(ObservableSource<U> sampler,
boolean emitLast)
Copy the code
TimeOut
public final Observable<T> timeout(long timeout,
java.util.concurrent.TimeUnit timeUnit)
public final Observable<T> timeout(long timeout,
java.util.concurrent.TimeUnit timeUnit,
ObservableSource<? extends T> other)
Copy the code
- Set a timeout interval that emits a TimeoutException(into onError) if the event is sent more than that.
- You can use this method if you don’t want to emit an exception but instead send a new observed event
Unlike the above operator, this operator controls the timeout through the Observable returned by the callback function. If the returned Observable sends an event but the source observer has not, a timeout occurs. Go to onError and throw a TimeOutException
public final <V> Observable<T> timeout(Function<? super T,? extends ObservableSource<V>> itemTimeoutIndicator)
public final <V> Observable<T> timeout(Function<? super T,? extends ObservableSource<V>> itemTimeoutIndicator,
ObservableSource<? extends T> other)
Copy the code
The following overload sets a separate timeout parameter (Observable) for the first event sent by the observer
public final <U,V> Observable<T> timeout(ObservableSource<U> firstTimeoutIndicator,
Function<? super T,? extends ObservableSource<V>> itemTimeoutIndicator)
public final <U,V> Observable<T> timeout(ObservableSource<U> firstTimeoutIndicator,
Function<? super T,? extends ObservableSource<V>> itemTimeoutIndicator,
ObservableSource<? extends T> other)
Copy the code
Skip the event
- The number of
- Specify a time
- Reverse order to skip
- The specified observer is finished sending
- The callback function is skipped continuously
Skip
Skip the event
public final Observable<T> skip(long count)
// Skip a specified number of events
public final Observable<T> skip(long time,
java.util.concurrent.TimeUnit unit)
// Skip events within a specified time
Copy the code
SkipLast
Skip sending events from last;
public final Observable<T> skipLast(int count)
public final Observable<T> skipLast(long time,
java.util.concurrent.TimeUnit unit)
public final Flowable<T> skipLast(long time,
java.util.concurrent.TimeUnit unit,
boolean delayError)
Copy the code
SkipUntil
All source observer events will be skipped before the observed as a parameter has sent the event.
public final <U> Observable<T> skipUntil(ObservableSource<U> other)
Copy the code
Sample code:
Observable.intervalRange(0.5.1.1, TimeUnit.SECONDS)
// If the following observed events are not completed (excluding onComplete), the source observed events will be skipped
.skipUntil(Observable.just(1)
.delay(2, TimeUnit.SECONDS))
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d("Log"."(MainActivity.java:80) ___ " + "accept() called with: aLong = [" + aLong + "]"); }});Copy the code
SkipWhile
Determine whether to discard events in a callback;
Unlike the filter operator, skipWhile can skip events only from the beginning, meaning that if you do not skip the first event, the callback will not be executed again, meaning that you cannot skip events at all.
public final Observable<T> skipWhile(Predicate<? super T> predicate)
Copy the code
To stop
- Maximum number of events that can be accepted
- The time at which events can be received
- Parameter Observer sends an event that stops the source observer’s subscription status
- Reverse order
- The callback function determines whether to stop
Take
This function is similar to Skip, except that the first time an emitter is prevented from firing an event, it goes directly to the onComplete callback
public final Observable<T> take(long count)
// Controls the maximum number of events received
public final Observable<T> take(long time,
java.util.concurrent.TimeUnit unit)
// Control can only receive events within a specified time
Copy the code
TakeUntil
public final <U> Observable<T> takeUntil(ObservableSource<U> other)
// If the parameter observer sends an event, the source observer is sent an end event
public final Observable<T> takeUntil(Predicate<? super T> stopPredicate)
// Call back to determine whether to end the sending of the event, return true to end the emitter event
Copy the code
TakeLast
public final Observable<T> takeLast(int count)
public final Observable<T> takeLast(long count,
long time,
java.util.concurrent.TimeUnit unit)
public final Observable<T> takeLast(long time,
java.util.concurrent.TimeUnit unit)
public final Observable<T> takeLast(long time,
java.util.concurrent.TimeUnit unit,
boolean delayError)
Copy the code
TakeWhile
In the callback, determine whether to terminate the emitter (again into onComplete), but unlike TakeUntil, return false to terminate.
public final Observable<T> takeWhile(Predicate<? super T> predicate)
Copy the code
Join
Add an observed (called target observed). Each event sent by the observed will in turn be combined with all the events of the source observed (passed in the same callback function).
public final <TRight,TLeftEnd,TRightEnd,R> Observable<R> join(ObservableSource<? extends TRight> other,
Function<? super T,? extends ObservableSource<TLeftEnd>> leftEnd,
Function<? super TRight,? extends ObservableSource<TRightEnd>> rightEnd,
BiFunction<? super T,? super TRight,? extends R> resultSelector)
Copy the code
Sample code:
Observable.just(1L.2L.3L.4L)
.join(Observable.just(5L.6L.7L.8L), new Function<Long, ObservableSource<Long>>() {
/** * receives the source observed event *@param aLong
* @returnThe returned observed sending event terminates the source observed sending event *@throws Exception
*/
@Override
public ObservableSource<Long> apply(Long aLong) throws Exception {
Log.d("Log"."(MainActivity.java:65) ___ " + "Source observer aLong = [" + aLong + "]");
return Observable.interval(3, TimeUnit.SECONDS); }},new Function<Long, ObservableSource<Long>>() {
/**
* 接受添加的被观察者事件(join Observable)
* @param aLong
* @returnThe returned observed sends an event that terminates the added observed *@throws Exception
*/
@Override
public ObservableSource<Long> apply(Long aLong) throws Exception {
Log.d("Log"."(MainActivity.java:75) ___ " + "Added observer aLong = [" + aLong + "]");
return Observable.interval(3, TimeUnit.SECONDS); }},new BiFunction<Long, Long, String>() {
/** * accepts both the added observed and the source observed events *@paramALong The event that the source sent by the observer *@paramALong2 The added event sent by the observer *@returnThe return value is finally received by the observer@throws Exception
*/
@Override
public String apply(Long aLong, Long aLong2) throws Exception {
Log.d("Log"."(MainActivity.java:89) ___ " + "apply() called with: aLong = [" + aLong
+ "], aLong2 = [" + aLong2 + "]");
return aLong + "" + aLong2;
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("Log"."(MainActivity.java:83) ___ " + "accept() called with: s = [" + s + "]"); }});Copy the code
GroupJoin
Similar to Join
public final <TRight,TLeftEnd,TRightEnd,R> Observable<R> groupJoin(ObservableSource<? extends TRight> other,
Function<? super T,? extends ObservableSource<TLeftEnd>> leftEnd,
Function<? super TRight,? extends ObservableSource<TRightEnd>> rightEnd,
BiFunction<? super T,? super Observable<TRight>,? extends R> resultSelector)
Copy the code
Example:
Observable.just(1L.2L.3L.4L)
.groupJoin(Observable.just(5L.6L.7L.8L),
new Function<Long, ObservableSource<Long>>() {
@Override
public ObservableSource<Long> apply(Long aLong) throws Exception {
return null; }},new Function<Long, ObservableSource<Long>>() {
@Override
public ObservableSource<Long> apply(Long aLong) throws Exception {
return null; }},new BiFunction<Long, Observable<Long>, String>() {
@Override
public String apply(Long aLong, Observable<Long> longObservable) throws Exception {
return null;
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("Log"."(MainActivity.java:78) ___ " + "accept() called with: s = [" + s + "]"); }});Copy the code
ToMap
public final <K> Single<java.util.Map<K,T>> toMap(Function<? super T,? extends K> keySelector)
public final <K,V> Single<java.util.Map<K,V>> toMap(Function<? super T,? extends K> keySelector,
Function<? super T,? extends V> valueSelector)
public final <K,V> Single<java.util.Map<K,V>> toMap(Function<? super T,? extends K> keySelector,
Function<? super T,? extends V> valueSelector,
java.util.concurrent.Callable<? extends java.util.Map<K,V>> mapSupplier)
Copy the code
Error handling
onErrorResumeNext
If an exception occurs, a callback is received to return another observed (OnNext not onError)
Public Final Observable<T> onErrorResumeNext(ObservableSource<? extends T> next)Copy the code
onErrorReturn
A callback return event is received if an exception occurs (execute OnNext but not onError)
public final Observable<T> onErrorReturn(Function<? super java.lang.Throwable,? extends T> valueSupplier)
Copy the code
Both operators end up executing onComplete
Retry
A retry is performed after the observed emitter emits an exception event (onError)
Public Final Observable<T> Retry () // Resends events when an error occurs. Public Final Observable<T> Retry (long)timesPublic Final Observable<T> Retry (Predicate<? super java.lang.Throwable> predicate) public final Observable<T> retry(BiPredicate<? super java.lang.Integer,? Super java.lang.Throwable> predicate) public Final Observable<T> determines whether or not to make the call again. retry(longtimes, Predicate<? Super java.lang.Throwable> predicate) // Predicate function + retry timesCopy the code
RetryUntil
The Retry operator actually has the same implementation Retry (predicate).
The difference is that returning true stops the retry
public final Observable<T> retryUntil(BooleanSupplier stop)
Copy the code
RetryWhen
In the callback function returns an observed that causes the source to retry if it emits an error event. Retry is not triggered if no error event is emitted;
public final Observable<T> retryWhen(Function<? super Observable<java.lang.Throwable>,? extends ObservableSource<? >> handler)Copy the code
Example:
This example does not trigger retry;
Observable.create( new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onError(null); emitter.onNext(3); } }) .retryWhen(new Function<Observable<Throwable>, ObservableSource<? >>() { @Override public ObservableSource<? > apply(Observable<Throwable> throwableObservable) throws Exception {return Observable.just(23);
}
})
.subscribe(
new DefaultObserver<Integer>() {
@Override
public void onNext(Integer integer) {
System.out.println("integer = [" + integer + "]");
}
@Override
public void onError(Throwable e) {
System.out.println("Main.onError");
}
@Override
public void onComplete() {
System.out.println("Main.onComplete"); }});Copy the code
Judge operation
Contains
To determine whether the emitted event contains the specified event, the observer gets a Boolean value
public final Single<java.lang.Boolean> contains(java.lang.Object element)
Copy the code
Any
public final Single<java.lang.Boolean> any(Predicate<? super T> predicate)
Copy the code
Judge each event in turn and terminate the emitter immediately if true is returned
Observable.just(1.2.3.4).any(new Predicate<Integer>() {
@Override public boolean test(Integer integer) throws Exception {
Log.d("Log"."(MainActivity.java:27) ___ " + "test() called with: integer = [" + integer + "]");
if (integer == 3) {
return true;
} else {
return false;
}
}
}).subscribe(new Consumer<Boolean>() {
@Override public void accept(Boolean aBoolean) throws Exception {
Log.d("Log"."(MainActivity.java:33) ___ " + "accept() called with: aBoolean = [" + aBoolean + "]"); }});Copy the code
IsEmpty
public final Single<java.lang.Boolean> isEmpty()
Copy the code
To determine whether an event was emitted, the observer gets a Boolean value.
DefaultIfEmpty
public final Observable<T> defaultIfEmpty(T defaultItem)
Copy the code
If the emitter emits no event, a specified default event is emitted, such as if the emitter’s event was intercepted
SwitchIfEmpty
public final Observable<T> switchIfEmpty(ObservableSource<? extends T> other)
Copy the code
If there is no emitted event it is replaced by another observer
Flowable.empty()
.switchIfEmpty(Flowable.just(3, 4, 5))
.subscribe(ele -> Log.i("tag", String.valueOf(ele)));
Copy the code
SequenceEqual
public static <T> Single<java.lang.Boolean> sequenceEqual(ObservableSource<? extends T> source1,
ObservableSource<? extends T> source2,
BiPredicate<? super T,? super T> isEqual)
/ / the last
public static <T> Single<java.lang.Boolean> sequenceEqual(ObservableSource<? extends T> source1,
ObservableSource<? extends T> source2)
Copy the code
Will compare whether the two observed are the same, and then the observer accepts a Boolean value, false for different types of emitted events. The event is terminated as soon as a difference is detected.
Observable.sequenceEqual(Observable.intervalRange(0.3.0.1, TimeUnit.SECONDS),
Observable.just(0l.1l.2l), new BiPredicate<Long, Long>() {
@Override public boolean test(Long aLong, Long aLong2) throws Exception {
// Check whether it is equal here
return false;
}
}).subscribe(new Consumer<Boolean>() {
@Override public void accept(Boolean aBoolean) throws Exception {
// Final result}});Copy the code
merge
Reduce
The addition operator can handle two events at a time, and then all events are processed to be accepted by the observer as the final event
public final Maybe<T> reduce(BiFunction<T,T,T> reducer)
Copy the code
The first processing (apply) receives events 1 and 2, and the NTH is the result of the last processing (the value returned by Apply) and event N
Example:
Observable.just(1.2.3.4).reduce(new BiFunction<Integer, Integer, Integer>() {
/** * This method will be called back several times until all the events have been added up (or acted on) before the observer receives the final result *@throws Exception
*/
@Override public Integer apply(Integer integer, Integer integer2) throws Exception {
// Three generics correspond to the current event return value of the last run result
return integer + integer2;
}
}).subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {}});Copy the code
Undetermined operator
public final <R> Single<R> reduce(R seed,
BiFunction<R,? super T,R> reducer)
public final <R> Single<R> reduceWith(java.util.concurrent.Callable<R> seedSupplier,
BiFunction<R,? super T,R> reducer)
Copy the code
Scan
Similar to Reduce, but each time the observer receives the return value of the callback parameter as an event
public final Observable<T> scan(BiFunction<T,T,T> accumulator)
public final <R> Observable<R> scanBiFunction<R,? BiFunction<R,?super T,R> accumulator)
Copy the code
ScanWith
public final <R> Observable<R> scanWith(java.util.concurrent.Callable<R> seedSupplier,
BiFunction<R,? super T,R> accumulator)
Copy the code
Collect
You can create containers to manipulate the data in turn (the observer receives only one event, the container)
Flowable.just(1.2.3)
.collect(
new Callable<ArrayList<Integer>>() {// Create a collection container
@Override
public ArrayList<Integer> call(a) throws Exception {
return newArrayList<>(); }},new BiConsumer<ArrayList<Integer>, Integer>() {// Create a collection
@Override
public void accept(ArrayList<Integer> list, Integer integer)
throws Exception {// The former container, the latter data
list.add(integer);
}
})
.subscribe(ele -> Log.d("Log"."(MainActivity.java:33) ___ Result = " + String.valueOf(ele)));
Copy the code
time
Delay
The delay operator delays the sending of each event (including onComplete but not onError)
public final Observable<T> delay(long delay, TimeUnit unit) public final <U> Observable<T> delay(Function<? super T,? Extends ObservableSource<U>> itemDelay) // The callback parameter causes the delay to end if onNext is sentCopy the code
TimeStamp
This operator encapsulates both the event and the sent time into an object Timed
public final Observable<Timed<T>> timestamp()
public final Observable<Timed<T>> timestamp(Scheduler scheduler)
public final Observable<Timed<T>> timestamp(TimeUnit unit)
public final Observable<Timed<T>> timestamp(TimeUnit unit,
Scheduler scheduler)
Copy the code
Example:
Observable.intervalRange(0.5.2.2, TimeUnit.SECONDS)
.timestamp()
.subscribe(new Consumer<Timed<Long>>() {
@Override
public void accept(Timed<Long> longTimed) throws Exception {
Log.d("Log"."accept() called with: longTimed = [" + longTimed + "]"); }});Copy the code
The results of
longTimed = [Timed[time=1525735346216, unit=MILLISECONDS, value=2]]
Copy the code
Count
The observer will receive the number of events, not the events themselves.
Flowable.just(1.2.3.4.5).count().subscribe(new BiConsumer<Long, Throwable>() {
@Override public void accept(Long aLong, Throwable throwable) throws Exception {
Log.d("Log"."(MainActivity.java:18) ___ Result = "+ aLong); }});Copy the code
BackPressure
It means back pressure.
Knowing the difference between Observable and Flowable, we still don’t know what is called back pressure, so let’s briefly understand the concept. The so-called back pressure is the problem caused by the producer (observed) producing faster than the consumer (observer) consuming.
To take a simple example, if the observer sends a message quickly, but the observer processes the message slowly, if there is no specific Flow control, a large backlog of messages will occupy system resources, resulting in very slow.
Synchronous threads are unlikely to cause this problem, and the observer cannot send the event again until it has finished processing it.
How to optimize and reduce this situation will be discussed later, but it can be noted that BackpressureStrategy has been set when Flowable is created, and Subscriber uses Request to control the maximum flow.
The observed
Hot Observable
Hot Observer: Starts sending events without subscribing. Subscriptions receive only events sent after the subscription.
The replay
- thread
- time
- Unit of time
- The cache size
public final ConnectableObservable<T> replay()
Copy the code
ConnectableObservable
The ConnectableObservable does not receive the event until it uses connect
public final Disposable connect(a)
public abstract void connect(Consumer<? super Disposable> connection)
Copy the code
Automatically connects, but does not disconnect upstream events when all observers unsubscribe
public Observable<T> autoConnect(a)
public Observable<T> autoConnect(int numberOfSubscribers) // Automatically connect when N observers subscribe
public Observable<T> autoConnect(int numberOfSubscribers,
Consumer<? super Disposable> connection) Dispose object is available in this callback to cancel upstream
Copy the code
This function connects when the first observer subscribes and ends the upstream event delivery when all observers unsubscribe.
public Observable<T> refCount(a)
public final Observable<T> share(a) // Equivalent to publish and refCount
Copy the code
Observable
High performance when the observed does not support back pressure
ObservableSource
Flowable
The performance of the observed that supports back pressure is slightly lower than that of An Observable. The function system is different from that of an Observable.
Publisher
Publisher Belongs to the root interface of the Flowable and ObservableSource belongs to the root interface of the Observable.
Single
The observed can only send one event, and repeated sending is not affected (there is no back pressure because only one event can be sent).
Single.create(object : SingleOnSubscribe<Int> {
override fun subscribe(emitter: SingleEmitter<Int>) {
emitter.onSuccess(12)
}
}).subscribe(object : SingleObserver<Int? > {override fun onSuccess(t: Int){}override fun onSubscribe(d: Disposable){}override fun onError(e: Throwable){}})Copy the code
DisposableSingleObserver
You can manually disconnect the observer
public final void dispose()
public final boolean isDisposed()
Copy the code
ResourceSingleObserver
You can add other Disposable and then cancel the observer altogether
public final void add(Disposable resource)
Copy the code
All the other observers basically follow these rules and there are observer classes with similar names
Completable
If your observer doesn’t even care about the onNext event, you can use Completable, which only has onComplete and onError events:
Completable.create(new CompletableOnSubscribe() {// @override public void subscribe(CompletableE) throws Exception {e.on complete (); // single onComplete or onError}}).subscribe(newCompletableObserver() {// observer @override public void onSubscribe(Disposable d) {} @override public voidonComplete() {
}
@Override
public void onError(Throwable e) {
}
});
Copy the code
It is also possible to use Actions to simplify the Observer:
- completable.subscribe(onComplete)
- completable.subscribe(onComplete,onError)
ToFlowable (), toObservable() and other methods can be used to convert to other types of observed.
Maybe
If you have a requirement that it is possible to send data or not send data at all, then you need Maybe, which is like a cross between Single and Completable.
Maybe may invoke one of the following:
- OnSuccess or onError
- The onComplete or onError
As you can see, onSuccess and onComplete are mutually exclusive.
// Observed
Maybe<String> maybe = Maybe.create(new MaybeOnSubscribe<String>() {
@Override
public void subscribe(MaybeEmitter<String> e) throws Exception {
e.onSuccess("test");// Send a data case, or onError, no need to call onComplete(it will not trigger the onComplete callback)
//e.onComplete(); // If no data needs to be sent, or onError}});// Subscribe to observer
maybe.subscribe(new MaybeObserver<String>() {
@Override
public void onSubscribe(Disposable d) {}@Override
public void onSuccess(String s) {
// Sending a data is equivalent to onNext and onComplete, but does not trigger another method, onComplete
Log.i("tag", s);
}
@Override
public void onComplete(a) {
// onComplete event when no data is sent
Log.i("tag"."onComplete");
}
@Override
public void onError(Throwable e) {}});Copy the code
Subject
It inherits both Observable and Observer
AsyncSubject
The observed will only send the last event (while onComplete)
Require that onComplete() be executed manually to receive the last event and onComplete, before or after the subscription.
Terminate any event if an exception occurs
BehaviorSubject
You can only observe the last event before the subscription and all events after the subscription
PublishSubject
Watch only events sent after the subscription;
ReplaySubject
Events are observed before and after the subscription
UnicastSubject
Allow the observed to subscribe only once, otherwise throw an exception.
SerializedSubject
Used for multithreaded launch events. Just wrap the observed.
Thread safety
If multiple threads emit events, you need to use the following function transformation to queue up and wait for the emitted events in sequence.
public final Subject<T> toSerialized()
Copy the code
SerializedSubject is no longer allowed to be created directly
The observer
-
Action: No parameter type
-
Consumer: Single parameter type
-
BiConsumer
,>
: two-parameter type
-
Consumer
[]>
: Multi-parameter type
test
The event class used to test RxJava
TestObserver
TestScheduler
For some event observers such as round-robin that takes a certain amount of time
Public void advanceTimeBy(long delayTime, Java. Util. Concurrent. TimeUnit unit) / / time direct to the point of a public void advanceTimeTo (long delayTime, java.util.concurrent.TimeUnit unit)Copy the code
TestSubsriber
@Test public void addition_isCorrect(a) throws Exception {
TestSubscriber<String> testSubscriber = new TestSubscriber<>();
// Launch A, B, C in sequence
Flowable.just("A"."B"."C").subscribe(testSubscriber);
// Assert whether the value does not exist
testSubscriber.assertNever("D");
// Assert whether the values are equal
testSubscriber.assertValues("A"."B"."C");
// Assert whether the number of values is equal
testSubscriber.assertValueCount(3);
// Whether the assertion ends
testSubscriber.assertTerminated();
}
Copy the code