Rxjava3.0 study notes – basics
Official Github address :github.com/ReactiveX/R…
RxJava is a responsive extended Java virtual machine implementation: a library that combines asynchronous and event-based programs by using observable sequences.
It extends the Observer pattern to support data/event sequences and adds operators that allow you to combine sequences declaratively while abstracting concerns about low-level threading, synchronization, thread safety and concurrent data structures.
1. Set dependencies
Implementation "IO. Reactivex. Rxjava3: rxjava: 3.1.1." "
2. Basic class
io.reactivex.rxjava3.core.Flowable
0… N flow, supporting responsive flow and back pressure
io.reactivex.rxjava3.core.Observable
0… N flow, does not support back pressure
io.reactivex.rxjava3.core.Single
A stream with only one item data or one error
io.reactivex.rxjava3.core.Completable
There is no stream of items, only a stream of completion or error signals
io.reactivex.rxjava3.core.Maybe
No items stream, only one item stream or only one error stream.
3. Some technical terms
3.1 Upstream and downstream
A data flow in RxJava consists of a source, zero or more intermediate steps, followed by a data consumer or composite step that is responsible for consuming the data flow in some way:
source.operator1().operator2().operator3().subscribe(consumer);
source.flatMap(value -> source.operator1().operator2().operator3());
Copy the code
Here, assuming we are now on operator2, looking left in the direction of the source is called Upstream. Looking right to subscribers/consumers is called downstream. This is usually more obvious when each element is written on a separate line:
source
.operator1()
.operator2()
.operator3()
.subscribe(consumer)
Copy the code
3.2 Objects in motion
In RxJava documentation, emission, emits, item, Event, Signal, data, and message are all considered synonyms for objects moving along the data stream.
3.3 back pressure Backpressure
When data flows through asynchronous steps, each step may perform different operations at different speeds. To avoid excessive steps, this often manifests as increased memory usage due to the need to temporarily buffer or skip/discard data, so-called backpressure, which is a form of flow control where steps indicate how many items they are prepared to process. This allows you to constrain the memory usage of the data stream when there is usually no way for a step to know how many items upstream will send to it.
In RxJava, the specialized Flowable class is designated to support backpressure, while Observables specialize in non-backpressure operations (short sequences, GUI interactions, and so on). Other types, Single, Maybe and Complete, do not support backpressure and should not support backpressure;
3.4 Assembly Time
Preparing the data stream by applying various intermediate operators is called Assembly time.
At this point in time, the data is not flowing and no side effects have occurred.
Flowable<Integer> flow = Flowable.range(1, 5)
.map(v -> v * v)
.filter(v -> v % 3 == 0)
;
Copy the code
3.5 Subscription time
Subscribe () is a temporary state when called on a stream that establishes an internal chain of processing steps:
flow.subscribe(System.out::println)
Copy the code
This is when the subscription side effect is triggered (see doOnSubscribe). In this state, some sources will immediately block or start launching items.
3.6 Runtime
This is the state of the flow when it actively signals items, Errors, or completion:
Observable.create(emitter -> { while (! emitter.isDisposed()) { long time = System.currentTimeMillis(); emitter.onNext(time); if (time % 2 ! = 0) { emitter.onError(new IllegalStateException("Odd millisecond!" )); break; } } }) .subscribe(System.out::println, Throwable::printStackTrace);Copy the code
In fact, this is when the body of the given example above executes.
4. Background threads asynchronously execute Simple Background Computation
A common use case for RxJava is to run some calculations on a background thread, network requests, and display the results (or errors) on the UI thread:
import io.reactivex.rxjava3.schedulers.Schedulers;
Flowable.fromCallable(() -> {
Thread.sleep(1000); // imitate expensive computation
return "Done";
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.single())
.subscribe(System.out::println, Throwable::printStackTrace);
Thread.sleep(2000); // <--- wait for the flow to finish
Copy the code
This type of linking method is called the Fluent API, and it is similar to the Builder pattern. However, RxJava’s reactive type is immutable, and each method call returns a new Flowable with added behavior. To illustrate, the example can be rewritten as follows:
Flowable<String> source = Flowable.fromCallable(() -> {
Thread.sleep(1000); // imitate expensive computation
return "Done";
});
Flowable<String> runBackground = source.subscribeOn(Schedulers.io());
Flowable<String> showForeground = runBackground.observeOn(Schedulers.single());
showForeground.subscribe(System.out::println, Throwable::printStackTrace);
Thread.sleep(2000);
Copy the code
Typically, you can move computed or blocking IO to another thread by subscribeOn. Once the data is ready, you can use observeOn to ensure that it is processed on the foreground or GUI thread.
4. 5. Smart laundry list
The RxJava operator does not work directly with threads or ExecutorServices, but rather uses a scheduler that is abstracted and originated behind a unified API. RxJava 3 provides several standard schedulers that can be accessed through the Schedulers utility classes.
Schedulers.computation(): Computationally intensive work is run on a fixed number of dedicated threads in the background. Most asynchronous operators use this as the default scheduler.
**Schedulers.io():** Runs I/ O or blocking operations on a dynamically changing set of threads. There seems to be no thread pool, so be careful about too many threads.
Schedulers.single(): Runs work on a single thread in sequential FIFO.
**Schedulers.trampoline():** Runs work in a sequential FIFO in a participating thread, usually for testing purposes.
These capabilities are available on all JVM platforms, except for some specific platforms, such as Android, which defines its own typical scheduler
AndroidSchedulers.mainThread()
, SwingScheduler.instance()
or JavaFXSchedulers.gui()
.
In addition, existing executors (and their subtypes, such as ExecutorService) can be wrapped into Scheduler via schedulers.from (Executor). For example, this could be used to have a larger but still fixed thread pool (as opposed to computation() and IO ()** respectively).
thread . sleep (2000); In the end, it’s no accident. In RxJava, the default scheduler runs on a daemon thread, which means that once the Java main thread exits and they are both stopped, background calculations may never happen. In the case of this example, sleeping for a while allows you to see the output streaming through the console and have extra time.
The following example shows how many threads are likely to be generated for each type of Schedulers in a loop.
public class _04Schedulers { private static final List<Long> threadIds = Collections.synchronizedList(new ArrayList<>()); private static int maxThreadCount = 0; public static void main(String[] args) throws Throwable { System.out.println("start"); threadIds.clear(); maxThreadCount = 0; / / thread pool maximum number of threads is 10 / / Scheduler Scheduler = Schedulers. The from (Executors. NewFixedThreadPool (10)); // Computation is the largest number of threads 8 // Scheduler = schedulers.io (); Scheduler = schedulers.single (); for (int i = 0; i < 100; i++) { testSchedulers(scheduler); } Thread.sleep(5000); System.out.println(" maximum number of threads "+maxThreadCount); System.out.println("end"); } private static void testSchedulers(Scheduler scheduler) { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable { Thread.sleep(100); Long threadId = Thread.currentThread().getId(); if (! threadIds.contains(threadId)) { threadIds.add(threadId); if (threadIds.size() ! = maxThreadCount) { maxThreadCount = threadIds.size(); emitter.onNext(maxThreadCount); } } emitter.onComplete(); } }).subscribeOn(scheduler) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Throwable { System.out.println(integer); }}); }}Copy the code
Concurrency within a flow
Flows in RxJava are sequential in nature, divided into processing phases that can run concurrently with each other:
Flowable.range(1, 10)
.observeOn(Schedulers.computation())
.map(v -> v * v)
.blockingSubscribe(System.out::println);
Copy the code
The sample flow will square the numbers from 1 to 10 on the Scheduler and consume the results on the “main thread” (to be more specific, the calling thread of blockingSubscribe). However, V -> v * V does not run in parallel for this flow; It receives values 1 through 10, one after another, on the same computational thread.
Schedulers.computation() creates only one thread.
Parallel processing
Processing numbers 1 through 10 in parallel is a bit complicated:
Flowable.range(1, 10)
.flatMap(v ->
Flowable.just(v)
.subscribeOn(Schedulers.computation())
.map(w -> w * w)
)
.blockingSubscribe(System.out::println);
Copy the code
In practice, parallelism in RxJava means running separate streams and merging their results into a single stream. The flatMap operator first maps each number from 1 to 10 into its own Flowable, runs them and merges the calculated squares. Note, however, that flatMap does not guarantee any order, and items from the internal stream may end up staggered.
There are a few alternative operators:
ConcatMap maps and runs internal flows one at a time
ConcatMapEager runs all internal flows “at once,” but the output flows will run in the order they were created.
Alternatively, the flowable.parallel () operator and the ParallelFlowable type help implement the same parallel processing pattern:
Both 10 concurrent processing and sequential output are guaranteed.
Flowable.range(1, 10)
.parallel()
.runOn(Schedulers.computation())
.map(v -> v * v)
.sequential()
.blockingSubscribe(System.out::println);
Copy the code
8.Dependent sub-flows
FlatMap is a powerful operator that can help in many situations. For example, given a service that returns Flowable, we want to invoke another service with the value emitted by the first service.
The later stream needs the data of the previous stream, and you can use a flatMap.
Public static void main(String[] args) throws InterruptedException {// For the first network request, Flowable<Long> inventorySource = Flowable.just(1L); // inventorySource.flatMap(new Function<Long, Publisher<String>>() { @Override public Publisher<String> apply(Long aLong) throws Throwable { System.out.println("ThreadId:"+Thread.currentThread().getId()); // A second request that depends on the first request. return Flowable.just(aLong.toString()+"_flatMap"); } }).map(new Function<String, String>() { @Override public String apply(String string) throws Throwable { System.out.println("ThreadId:"+Thread.currentThread().getId()); return string+"_map"; } }).subscribeOn(Schedulers.computation()) .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Throwable { System.out.println("ThreadId:"+Thread.currentThread().getId()); System.out.println(s); }}); Thread.sleep(1000); }Copy the code
The output.
ThreadId:12 ThreadId:12 ThreadId:12 1_flatMap_mapn().
8.1 depend on the Dependent
The most typical scenario is to call another service, given a value, and wait and continue executing its results:
service.apiCall()
.flatMap(value -> service.anotherApiCall(value))
.flatMap(next -> service.finalCall(next))
Copy the code
Typically, later sequences also require values from the previous mapping, which can be done by moving the outer flatMap to the inner part of the previous flatMap.
service.apiCall()
.flatMap(value ->
service.anotherApiCall(value)
.flatMap(next -> service.finalCallBoth(value, next))
)
Copy the code
Here, the original values will be available in the inner flatMap, provided by lambda variable capture.
8.2 Non – dependent
In other scenarios, the results of the first source/data flow are irrelevant, and people want to continue using another source that is quasi-independent. FlatMap also works here.
Observable continued = sourceObservable.flatMapSingle(ignored -> someSingleSource)
continued.map(v -> v.toString())
.subscribe(System.out::println, Throwable::printStackTrace);
Copy the code
However, in this case, stick with Observable rather than the more appropriate Single. This is understandable because the sourceObservable is a multi-valued source from the point of view of the flatMapSingle, so the mapping can also produce multiple values.
However, there is usually a more expressive (and less expensive) approach, which is to use the Completable as the mediation and its operator, and then move on to something else.
sourceObservable
.ignoreElements() // returns Completable
.andThen(someSingleSource)
.map(v -> v.toString())
Copy the code
The only dependency between sourceObservable and someSingleSource is that the former must complete properly before the latter can be consumed.
8.3 Delay depends on Deferred-dependent
Sometimes there is an implicit data dependency between the previous sequence and the new sequence that, for some reason, does not go through the “usual channels.” People tend to write like this
AtomicInteger count = new AtomicInteger();
Observable.range(1, 10)
.doOnNext(ignored -> count.incrementAndGet())
.ignoreElements()
.andThen(Single.just(count.get()))
.subscribe(System.out::println);
Copy the code
Unfortunately, this will print 0, because single.just (count.get()) is computed at assembly time, before the data stream runs. We need something to delay the calculation of this Single source until runtime, when the main source is finished:
AtomicInteger count = new AtomicInteger();
Observable.range(1, 10)
.doOnNext(ignored -> count.incrementAndGet())
.ignoreElements()
.andThen(Single.defer(() -> Single.just(count.get())))
.subscribe(System.out::println);
Copy the code
or
AtomicInteger count = new AtomicInteger();
Observable.range(1, 10)
.doOnNext(ignored -> count.incrementAndGet())
.ignoreElements()
.andThen(Single.fromCallable(() -> count.get()))
.subscribe(System.out::println);
Copy the code
Type conversions
Sometimes the type returned by a source or service is different from the type of the flow that should use it. For example, in the inventory example above, getDemandAsync can return a Single. If the code example remains unchanged, this results in a compile-time error (usually, however, with a misleading error message about the lack of overloading).
In this case, there are usually two options to fix the conversion :1) convert to the desired type, or 2) find and use overloads that support specific operators of different types.
Each reactive base class has operators that can perform such conversions, including protocol conversions, to match other types. The following matrix shows the conversion options available:
Convert to the desired type
Flowable | Observable | Single | Maybe | Completable | |
Flowable | toObservable |
first .firstOrError .single .singleOrError .last .lastOrError 1 |
firstElement .singleElement .lastElement |
ignoreElements |
|
Observable | toFlowable 2 |
first .firstOrError .single .singleOrError .last .lastOrError 1 |
firstElement .singleElement .lastElement |
ignoreElements |
|
Single | toFlowable 3 |
toObservable |
toMaybe |
ignoreElement |
|
Maybe | toFlowable 3 |
toObservable |
toSingle |
ignoreElement |
|
Completable | toFlowable |
toObservable |
toSingle |
toMaybe |
1. When converting a multi-valued source to a single-valued source, you should decide which of the multiple source values to use as the result.
2. Turning an Observable into a Flowable requires an additional decision: how do you handle potentially unconstrained flows from source observables? With the BackpressureStrategy parameter or a standard Flowable operator such as onBackpressureBuffer, There are several strategies available (such as buffering, dropping, keeping up to date),onBackpressureDrop, onBackpressureLatest, which also allow further customization of backpressure behavior.
3. When there is only (at most) one source, there is no problem with backpressure because it can be stored downstream ready for consumption.
Use an overload of the desired type
Many commonly used operators have the ability to handle other types of overloading. They are usually named after the suffix of the target type:
Operator | Overloads |
---|---|
flatMap |
flatMapSingle .flatMapMaybe .flatMapCompletable .flatMapIterable |
concatMap |
concatMapSingle .concatMapMaybe .concatMapCompletable .concatMapIterable |
switchMap |
switchMapSingle .switchMapMaybe .switchMapCompletable |
The reason these operators use suffixes instead of the same name with a different signature is type erasure. Java does not consider signatures such as operator(Function<T, Single>) and operator(Function<T, Maybe>) to be different (unlike c#), and because of erasure, the two operators end up repeating methods with the same signature.
Operator naming conventions
Naming is one of the most difficult things to do in programming, because names should not be very long, expressive, and easy to remember. Unfortunately, the target language (and pre-existing conventions) may not offer much help in this regard (unusable keywords, type erasers, type ambiguity, and so on).
The keyword Unusable keywords cannot be used
In the original RX.NET, the operator that issues a single item and completes that item was called Return(T). Because Java’s convention is that method names begin with lowercase letters, it should be Return(T), which is a keyword in Java and therefore not available. Therefore, RxJava names the operator just(T). The same limitation applies to the Switch operator, which must be named switchOnNext. Another example is Catch, which is called onErrorResumeNext.
Type Erasure Type Erasure
Many operators that expect the user to provide functions that return a reactive type cannot be overloaded because type erasers around function <T, X> make these method signatures duplicate. RxJava chooses to name these operators by suffixing them with types:
Flowable<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper)
Flowable<R> flatMapMaybe(Function<? super T, ? extends MaybeSource<? extends R>> mapper)
Copy the code
Type ambiguities
Even if certain operators have no problem with type erasure, their signatures can be ambiguous, especially when using Java 8 and lambda. For example, several concatWith overloads take various other responsive base types as arguments (to provide convenience and performance benefits in the underlying implementation)
Flowable<T> concatWith(Publisher<? extends T> other);
Flowable<T> concatWith(SingleSource<? extends T> other);
Copy the code
Publisher and SingleSource both come in the form of a functional interface (type with an abstract method) and may encourage users to try to provide a lambda expression:
someSource.concatWith(s -> Single.just(2))
.subscribe(System.out::println, Throwable::printStackTrace);
Copy the code
Unfortunately, this method didn’t work, and this example didn’t print 2 at all. In fact, starting with version 2.1.10, it didn’t even compile because there were at least four concatWith overloads and the compiler found the above code to be ambiguous.
In this case, the user might want to defer some calculations until someesource is complete, so the correct explicit operator should be deferred:
someSource.concatWith(Single.defer(() -> Single.just(2)))
.subscribe(System.out::println, Throwable::printStackTrace);
Copy the code
Sometimes suffixes are added to avoid logical ambiguities that might compile but produce the wrong type in the stream:
Flowable<T> merge(Publisher<? extends Publisher<? extends T>> sources);
Flowable<T> mergeArray(Publisher<? extends T>... sources);
Copy the code
This can also be ambiguous when function interface types are involved as type parameters T.
Error handling
The data flow may fail, at which point an error is sent to the consumer. But sometimes, multiple sources may fail, and we can choose whether to wait for all sources to complete or fail. To indicate this opportunity, many operator names have the DelayError suffix (while others have the DelayError or delayErrors Boolean flag in their overloading):
Flowable<T> concat(Publisher<? extends Publisher<? extends T>> sources);
Flowable<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources);
Copy the code
Of course, various suffixes can come together:
Flowable<T> concatArrayEagerDelayError(Publisher<? extends T>... sources);
Copy the code
Base class vs base type
Base classes can be considered burdensome because of the large number of static and instance methods on them. The design of RxJava 3 is heavily influenced by the response flow specification, so the library provides a class and an interface for each response type:
Type | Class | Interface | Consumer |
---|---|---|---|
0.. N backpressured | Flowable |
Publisher 1 |
Subscriber |
0.. N unbounded | Observable |
ObservableSource 2 |
Observer |
1 element or error | Single |
SingleSource |
SingleObserver |
0.. 1 element or error | Maybe |
MaybeSource |
MaybeObserver |
0 element or error | Completable |
CompletableSource |
CompletableObserver |
Reactive.stream.publisher is part of an external reactive flow library and is the primary type of interaction with other active libraries through standardized mechanisms controlled by active flow specifications.
The naming convention for the interface is to append Source to the semi-traditional class name, and there is no FlowableSource because Publisher is provided by the response flow library (and subtypes are not helpful for interoperability). However, these interfaces are not standard in the sense of the response flow specification and are currently only RXJava-specific.