Rxjava2 is a great tool for everyday development, especially for asynchronous tasks. Responsive programming and good support for streaming apis make for a better coding experience. More and more developers are using it. The best place to learn rxjava2 is in the official documentation, which is detailed and complete. The following explains the usage of each operator based on the official documentation and my own understanding and examples, for your reference as well as my own.
How to use Rxjava2
To use RxJava, you need to create Observables, transform these Observables in various ways to get the exact data items you want (by using the Observable operator), and then observe and respond to the desired sequence of items (by implementing observers) or subscribers. They are then subscribed to the final transformed Observables.
The Creating Observables creation operator
just
The reaction type is constructed by taking a pre-existing object and publishing that specific object to downstream consumers when subscribing. For convenience, there is an overload of two to nine parameters, and these objects (of the same common type) will be emitted in the order specified. Like From, but notice that From passes in an array or an iterable or something like that to fetch the item to emit, whereas Just simply emits an array or an iterator.
Note that if YOU pass NULL to Just, it returns an Observable that emits null as an item. Don’t make the mistake of assuming that this will return an empty Observable (one that doesn’t emit any items at all). To do this, use the Empty operator.
fun testOpJust() {
val arr = arrayOf("mary"."tom"."ben"."lisa"."ken")
Observable.fromArray(arr).filter { it.size > 3 }.map { it + "s" }.subscribe(System.out::println)
val list = arrayListOf("mary"."tom"."ben"."lisa"."ken")
Observable.just(list).forEach { it -> System.out.println(it + "s") }
list.stream().filter { it -> it.length > 3 }.map { "$it s" }.forEach(System.out::println)
}
Copy the code
from
Constructs sequences based on pre-existing source or generator types. When using Observables, it is more convenient if all the data used can be represented as Observables, rather than a mix of Observables and other types. This allows a set of operators to control the entire life cycle of the data flow. For example, Iterables can be thought of as an Observable; As an Observable that always emits a single item. By explicitly transforming these objects into Observables, you can interact with them as peers to other Observables. Therefore, most ReactiveX implementations have methods that allow language-specific objects and data structures to be transformed into Observables.
Note: These static methods use postfix naming conventions (that is, repeating parameter types in method names) to avoid overload resolution ambiguity.
fromIterable
Signal from a java.lang.Iterable source (such as Lists, Sets or Collections or Custom Iterables) and complete the sequence.
Can be used for Flowable,Observable
fromArray
Signal the elements of the given array and complete the sequence. Can be used for Flowable, observables
Note: RxJava does not support raw arrays, only (generically) reference arrays.
fun testOpFrom{val list = arrayListOf<Int>(1,2,3,4,5,6) Observable. Subscribe (system.out ::println) Observables. FromArray (6). The subscribe (System. Out: : println)}Copy the code
fromCallable
When consumers to subscribe to, call a given Java. Util. Concurrent. The Callable and its return value (or the exception thrown) is forwarded to the user.
Can be used for: observables, Flowable, Maybe, Single, Completable
Note: In Completable, the actual return value is ignored and the Completable completes.
Observable.fromCallable<String> {
"hello"
}.subscribe(System.out::println)
Completable.fromCallable{
"complatable from callable"
}.subscribe {
System.out.println("complete")}Copy the code
fromAction
When consumers to subscribe to, call the given IO. Reactivex. The function, the Action and consumers complete or receiving the Action the exception thrown.
Can be used for: Maybe,Completable
Maybe.fromAction<String>{
System.out.println("maybe from action")
}.subscribe(System.out::println)
Copy the code
The subscript star is not much to explain, not much to use
*fromRunnable
*fromFuture
*from{reactive type}
Wraps or converts another reaction type to the target reaction type. The following combination is provided in the various response types with the following signature patterns: targetType.from {sourceType} ()
Note: Not all possible transformations are implemented through the from {reactive Type} method family. Look at the to {reactive Type} method family for further transformation possibilities.
Note: The difference between fromAction and fromRunnable is that the Action interface allows checked exceptions to be thrown, whereas java.lang.Runnable does not.
error
Can be used for observables, Flowable, Maybe, Single, Completable
Through the Java. Util. Concurrent. Callable a pre-existing or errors generated signals to consumers.
fun testOpError(){
Observable.error<Throwable>(IOException(""))
.subscribe({
System.out.print("Can't print?")
},{
it.printStackTrace()
},{
System.out.println("I can't print.")})}Copy the code
A typical use case is to use onErrorResumeNext to conditionally map or suppress an exception in a chain:
/** * Suppress exceptions on the chain */ @test funtestOpOnErrorResumeNext() {
val observable = Observable.fromCallable {
if(Math. The random () < 0.5 f) {throw IllegalArgumentException ()} throw IOException ()} observables. OnErrorResumeNext (Function {if (it is IllegalArgumentException) {
Observable.empty()
} else {
Observable.error(it)
}
}).subscribe({
System.out.println("nothing")
},{
it.printStackTrace()
},{
System.out.println("empty")})}Copy the code
The onErrorResumeNext checkcheckcheckcheckcheckcheckcheckcheckcheckcheckcheckcheckcheckcheckcheckcheckcheckcheckcheckcheckcheckcheckcheckCheck This operator is used to suppress the propagation of an error, which would have triggered an onError callback if the subscribe had occurred. In fact, an error may have occurred that needs to be left unhandled or suppressed. In the function parameter of onErrorResumeNext, you can return the processing flow based on the error type.
- A source of the type empty indicates completion immediately after the subscription. Can be used for observables, Flowable, Maybe, Single, Completable
You can see an example of onErrorResumeNext
The empty send directly indicates completion, meaning the subscriber calls the onComplete callback directly. OnNext will not execute
- A source of the never type does not signal any onNext, onSuccess, onError, or onComplete. This type of reaction source can be used to test or “disable” certain sources in the combinator suboperator.
Can be used for observables, Flowable, Maybe, Single, Completable
Any callbacks from the subscriber are not invoked. Disabling is also understandable, such as sending an error, do not proceed
- Interval periodically generates infinite, ever-increasing numbers (of type Long). The intervalRange variant produces a finite number of such numbers.
Can be used for observables, Flowable
fun testOpInterval(){
Observable.interval(1,TimeUnit.SECONDS)
.onErrorResumeNext(Function {
Observable.error(it)
})
.subscribe({
if (it.rem(5) == 0L) {
System.out.println("tick")}else {
System.out.println("tock")
}
},{
it.printStackTrace()
},{
System.out.println("interval complete")})}Copy the code
- The Timer operator creates an Observable that emits a specific item after a specified period of time.
That is, the event is sent after a given time
- Range generates a series of values for each consumer. The range () method generates Integers, and rangeLong () generates Longs. The Range operator emits a sequence of sequential integers in which you choose the start of a Range and its length.
Can be used for observables, Flowable
fun testOpRange(){
val s = "test range operation now"
Observable.range(0,s.length- 3)
.map { "${s[it]} in range"}
.subscribe {
System.out.println(it)
}
}
Copy the code
Emits a series of values, parameters for the starting point, and length.
- Generate Creates a cold, synchronous, and stateful value generator.
Can be used for observables, Flowable
@Test
fun testOpGenerate(){
val start = 1
val increaseValue = 2
Observable.generate<Int,Int>(Callable<Int> {
start
}, BiFunction<Int, Emitter<Int>,Int> {
t1, t2 ->
t2.onNext(t1 + increaseValue)
t1 + increaseValue
}).subscribe {
System.out.println("generate value : $it")}}Copy the code
I don’t really know what it is, the application scenarios. It just keeps producing values
Filtering Observables Filters Observable
Filtering operations are very common and important, and there are many related operators
Debounce
Can be used for observables, Flowable
Removes items issued by the response source that are followed by updated items before the given timeout value expires. The timer resets each launch. This operator keeps track of the most recently issued item and will only issue it if enough time has passed without the source issuing any other items.
Debounde passes in a timeout value, and if multiple launches occur within that time, the nearest timeout value is taken. Since there is a timeout, there should also be a start time. The start time is the initial value of a set of launches, and the difference between the launch values is within the debounce timeout.
// Diagram:
// -A--------------B----C-D-------------------E-|---->
// a---------1s
// b---------1s
// c---------1s
// d---------1s
// e-|---->
// -----------A---------------------D-----------E-|-->
fun testOpDebounce(){
Observable.create<String>{
it.onNext("A")
Thread.sleep(1_500)
it.onNext("B")
Thread.sleep(500)
it.onNext("C")
Thread.sleep(250)
it.onNext("D")
Thread.sleep(2_000)
it.onNext("E")
}.debounce(1,TimeUnit.SECONDS)
.subscribe(System.out::println)
}
Copy the code
distinct
Observable Flowable can be used to filter response sources by emitting only items that are different from previous items. Can specify the IO. Reactivex. Functions provides the Function, the source of each project mapped to a new value, the value will be used in comparing with previous map values. The Distinct operator filters observables by allowing only items that have not yet been issued. In some implementations, there are variations that allow adjusting the criteria for two items to be considered “different.” In some embodiments, there are variations of the operator that only compare an item to its predecessor for a more accurate comparison, thus filtering only consecutive repeating items, items in a sequence.
fun testOpDistinct(){observable.fromarray (1,2,3,3,4,5).distinct().subscribe(system.out ::println ,1,2,3,2 observables. FromArray (1). Distinct {"Ha ha" }
.subscribe(System.out::println)
}
Copy the code
Override method, pass keySelectro, and what it does is apply the method to each element to get the new value, and then decide how to undo it, okay
distinctUntilChanged
Observable Flowable is used to filter response sources by emitting only items that are different from the previous element. Can specify the IO. Reactivex. Functions provides the Function, the source of each project mapped to a new value, the value will be used in comparing with previous map values. Or, you can specify the IO. Reactivex. Functions provides. BiPredicate as the comparator function to compare the previous one.
Observable.fromarray (1,2,3,3,4,5) //.distinctuntilchanged ().distinctuntilchanged {t1, t2 -> t1 == t2 } .subscribe(System.out::println)Copy the code
An enhanced version of DISTINCT, with an overloaded method that can be passed in to a comparator
elementAt
The class is used by a Flowable,Observable that emits a single item with a specified zero-based index in a series of emitted data items from a response source. If the specified index is not in the sequence, you can specify the default item to emit.
Gets elements at a specified position in the order in which they are emitted
,2,3,3,4,5 observables. FromArray (1). ElementAt (2) the subscribe (System. Out: : println)Copy the code
elementAtOrError
filter
Can be used for observables, Flowable, Maybe, Single satisfy the specified function item to filter through the only by the reaction source item.
Filter even observable.fromarray (1,2,3,3,4,5).filter {it.rem(2) == 0}. Subscribe (system.out ::println)}Copy the code
first
Used with Flowable, Observables emit only the first project that the source emits, or the given default project if the source completes without emitting. This differs from firstElement in that the operator returns Single, whereas firstElement returns Maybe.
Observable. FromArray (1,2,3,3,4,5).first(-1).subscribe(Consumer<Int> {system.out.println)"onNext :$it"}) observable.fromarray (1,2,3,3,4,5).subscribe ().subscribe {system.out.println ()"onNext :$it")}Copy the code
firstOrError
Only a response to a source from the first project, or if the source complete without a project is a Java. Util. NoSuchElementException signal.
ignoreElement
Can be used with Maybe Single to ignore individual items emitted by a Single or Maybe source and return a Completable that signals only an error or completion event from the source.
Maybe.timer(1L,TimeUnit.SECONDS)
.ignoreElement()
.doOnComplete {
System.out.println("done")
}
.blockingAwait(a)Copy the code
ignoreElements
Ignore individual items emitted by the Single or Maybe source and return a Completable that signals only an error or completion event from the source.
Observable.timer(1L,TimeUnit.SECONDS)
.ignoreElements()
.doOnComplete {
System.out.println("completed")
}
.blockingAwait(a)Copy the code
last
Can be used for observables, Flowable
Emit only the last item emitted by the reaction source, or the given default item if the source completes without emitting an item. This differs from lastElement in that it returns Single, whereas lastElement returns Maybe.
,2,3,3,4,5 observables. FromArray (1). The last (1) the subscribe (Consumer < Int > {System. Out. Println ("last $it")})Copy the code
lastElement
,2,3,3,4,5 observables. FromArray (1). LastElement (). The subscribe (Consumer < Int > {System. Out. Println ("last $it")})Copy the code
lastOnError
Only a response to the source of the last item, or if the source complete without producing items, emit Java. Util. NoSuchElementException signal.
ofType
Use Flowable, Observable,Maybe to filter items emitted by a response source by emitting only items of a specified type.
,2.1 f observables. FromArray (1, 3, 5-tetrafluorobenzoic). The ofType (Int: : class. Java). The subscribe (Consumer < Int > {System. Out. Println ("last $it")})Copy the code
sample
Observable Flowable can be used to filter items emitted by the response source by emitting only the most recently emitted items at periodic intervals.
Observable.create<String> {
it.onNext("A")
Thread.sleep(1_000)
it.onNext("B")
Thread.sleep(300)
it.onNext("C")
Thread.sleep(700)
it.onNext("D")
it.onComplete()
}.sample(1,TimeUnit.SECONDS)
.blockingSubscribe(System.out::println)
Copy the code
skip
Delete the first n items issued by the response source and issue the remaining items. You can ignore the first n items emitted by an Observable by modifying it with the Skip operator and only participate in the next ones.
Observable.fromArray("hehe",2.1f,3,3,4,5) //.ofType(String::class.java).skip(3).subscribe {system.out.println (it)}Copy the code
skipLast
Discard the last n items emitted by the reaction source and emit the remaining items.
take
Can be used as a Flowable Observable to emit only the first n terms emitted by the response source.
Observable.fromArray("hehe"And 2.1 f, 3 5-tetrafluorobenzoic.) take (2) the subscribe (System. Out: : println)Copy the code
takeLast
Can be used by the Flowable Observable to emit only the last n items emitted by the response source.
throttleFirst
Can be used as a Flowable Observable
Similar to debounce, it is the first in a time range and is commonly used in click event filtering
Only the first item emitted by the reaction source is emitted during a continuous time window of specified duration.
Observable.create<String> {
it.onNext("A")
Thread.sleep(300)
it.onNext("B")
Thread.sleep(400)
}.throttleFirst(1,TimeUnit.SECONDS)
.subscribe(System.out::println)
Copy the code
throttleLast
Observable,Flowable emits only the last item emitted by the response source during continuous time of specified duration. As opposed to throttleFirst, take the last value
throttleWithTimeout
An alias for Debounce
public final Observable<T> throttleWithTimeout(long timeout, TimeUnit unit) {
return debounce(timeout, unit);
}
Copy the code
timeout
From observables or Flowable source project, but if the timeout specified in the from on a sustained period of time has not issued under a, with Java. Util. Concurrent. TimeoutException terminated. For Maybe, Single, and Completable, the specified timeout duration specifies the maximum time to wait for a success or completion event to arrive. If Maybe, Single or Completable not completed within the given time, will be issued a Java. Util. Concurrent. TimeoutException.
Observable.create<String>{
it.onNext("A")
Thread.sleep(600)
it.onNext("B")
Thread.sleep(1_500)
it.onNext("C")
Thread.sleep(500)
}.subscribeOn(Schedulers.io())
.subscribe({
System.out.println(it)
},{
it.printStackTrace()
})
Copy the code
Capture process
In the following code for Kotlin, you can see that in the event of an error, an error is thrown through onError(), and a second argument is passed in at the subscriber to handle the error callback.
fun testErrorHandle() {
Observable.create<String> {
it.onNext("start")
Thread {
try {
System.out.println("start open ...")
it.onNext("start open ...")
val stream = URL("https://www.baidu.com").openStream()
System.out.println("after url ...")
it.onNext("after url")
val br = stream.bufferedReader()
if(! it.isDisposed) { var text = br.readText() it.onNext(text) } stream.close() br.close() it.onNext("after open ...")
if(! it.isDisposed) { it.onComplete() } }catch (e : java.lang.Exception) { System.out.println(e) e.printStackTrace() it.onError(e) } }.start() }.subscribe(System.out::println) { it.printStackTrace() System.out.println("what the fuck")}}Copy the code
Observables usually don’t throw exceptions. Instead, it notifies any observer that an unrecoverable error has occurred by terminating the Observable sequence with onError notification.
There are some exceptions. For example, if the onError () call itself fails, an Observable will not attempt to notify the observer by calling onError again, but will throw a RuntimeException, OnErrorFailedException or OnErrorNotImplementedException.
Techniques for recovering from onError notifications
Therefore, rather than catching exceptions, it is the observer or operator that should more often respond to onError notifications of exceptions. There are also various Observable operators that can be used to react to or recover from onError notifications from an Observable. For example, you can use the operator:
- Swallow the error and switch to the backup Observable to continue the sequence
- Swallow the error and issue the default
- Swallow the error and immediately try to restart the failed Observable
- Swallows the error and tries to restart the failed Observable after some retreat interval
These strategies can be implemented using the operators described in the error handling operators.
Swallow, I think, means not handling exceptions
Rxjava-specific exceptions and how do you handle them
CompositeException indicates that multiple exceptions have occurred. You can use the getExceptions() method of the exception to retrieve the individual exceptions that make up the composition.
MissingBackpressureException too that attempts to a data item is applied to its observables. About back pressure (github.com/ReactiveX/R…) See Backpressure for an Observable solution to this problem.
OnErrorFailedException This indicates that an Observable attempts to call its observer’s onError () method, but the method itself raises an exception.
OnErrorNotImplementedException suggesting observables is trying to call its observer onError () method, but there is no such method exists. You can eliminate this problem by fixing Observable so that it no longer meets error conditions, by implementing onError handlers in the observer, or by intercepting onError notifications before reaching the observer using one of the operators described elsewhere on this page.
OnErrorThrowable Observers pass this type of throwable to their observer’s onError () handler. The Throwable for this variable contains more information about the error and the Observable specific state of the system at the time of the error, rather than the standard Throwable.
The resources
Official document