Responsive programming
(Ten thousand word warning)
After learning Spring WebFlux, I found that many concepts were unclear. After a careful look, I realized that I did not understand responsive programming haha, so I used the class time to sort out a little according to the official website for further study.
In order to prevent disputes, I will try to attach the original words, because this article is my manual translation, so there may be some translation is not in place, so I suggest you to read the official document. All conflicts are subject to official documentation.
Introduction to the
Reactive Programming is an implementation of the Reactive Programming specification, which can be summarized as follows:
Reactive programming is an asynchronous programming paradigm concerned with data streams and the propagation of change. This means that it becomes possible to express static (e.g. arrays) or dynamic (e.g. event emitters) data streams with Ease via the employed programming language (s). – en.wikipedia.org/wiki/Reacti…
In human terms, reactive programming is an asynchronous programming paradigm that involves the flow of data and the propagation of change. This means that static (such as arrays) or dynamic (such as event emitters) data flows can be easily expressed by the programming language in use. Times are moving forward, and Through the efforts of Reactive Streams, Java has standardized this specification, which defines a set of interfaces and interaction rules for responding libraries on the JVM. Its interface has been integrated into Java 9 under the Flow class. The responsive programming paradigm is typically expressed in object-oriented languages as an extension of the Observer design pattern (the Observer pattern).
Reactor is a completely non-blocking, responsive programming implementation of the JVM with efficient requirements management (in the form of managing “negative pressure”). It integrates directly with Java8 functional apis, specifically CompletableFuture, Stream, and Duration. It provides a combination of the asynchronous sequence API, such as the Flux (for [0 | N] element) and Mono (for [0 | 1] element), and extended to realize the Reactive Streams specification. Reactor also supports non-blocking interprocess communication with the Reactor-NetTY project. For microservices architecture, Reactor Netty provides a back-pressure networking engine for HTTP (including Websockets), TCP, and UDP. Full support for reactive encoding and decoding.
The main response flow pattern can be compared to the familiar Iterator design pattern because of the duplicity of Iterable-Iterator pairs in all of these libraries. One major difference is that while the Iterator is pull based, the response flow is push based. The use of iterators is an imperative programming pattern in which the method of accessing a value is solely Iterable’s responsibility (pulling). Indeed, the developer can choose when to access the next() item in the sequence. In a reactive flow, the above iterator pair is equivalent to a publisher-subscriber pair.
In reactive programming, Publisher notifies Subscriber when a new value is available, and this push mode is the key to responding. (Subscriber notification is like push message, so it is called push. In iterator, it is the iterator that actively obtains the value of the next iteration, just like pull value, so it is pull.) Similarly, actions applied to push values are declarative rather than command:
The programmer describes the logic of the computation, not the exact flow of control. (More OOP) In addition to pushing values, both aspects of error handling and completion are overridden in well-defined ways. Publishers can push new values to their subscribers (by calling onNext), but can also signal errors (by calling onError) or complete operations (by calling onComplete). Both the error signal and the completion of the operation terminate the sequence.Copy the code
This method is very flexible. This pattern supports use cases with no values, one value, or N values (including an infinite sequence of values, such as the continuous ticking of a clock).
Inadequate blocking methods:
Modern applications face a large number of concurrent operations, and even though modern hardware continues to improve, the performance of modern software is still the key to solving the problem. Broadly speaking, there are two ways to improve a program's performance: a) using more threads and more hardware resources in parallel. B) Seeking greater efficiency with existing resources.Copy the code
Typically, Java developers write programs by using blocking code. This worked well until there were no bottlenecks. Then it’s time to introduce other threads (i.e., multithreaded programming) to run similar blocking code. But this form of resource utilization can quickly lead to competition and concurrency problems. Worse, blocking wastes resources. If you look closely, any time your program encounters something time-consuming (especially I/O, such as a database request or a network call), it wastes resources because threads (possibly many of them) are now idle, doing nothing and waiting for data. Therefore, parallelization is not a panacea. We need to squeeze all the performance out of the hardware.
Solution: The second method mentioned earlier, seeking greater efficiency, can solve the problem of wasted resources. By writing asynchronous non-blocking code, you can switch execution to another active task using the same underlying resource and return to the current process when the asynchronous processing is complete. But how do you write asynchronous code on the JVM? Java provides two asynchronous programming models:
A) Callbacks: An asynchronous method that does not return a value, but takes an additional callback argument (lambda or anonymous class) that is called when the result is available. A well-known example is Swing's EventListener hierarchy. B) Future: Asynchronous methods immediately return Future<T>. Asynchronous processing computes the T value, but the Future wraps access to it with an object. The value is not immediately available, and the program can poll the object until the value is available. For example, Future objects are used by executorServices that run Callable<T> tasks.Copy the code
Are these technologies good enough? Not every situation is perfect, and both approaches have limitations. Callbacks are hard to put together and can lead to code that is difficult to read and maintain (called “callback hell”). Such as:
userService.getFavorites(userId, new Callback<List<String>>() {
public void onSuccess(List<String> list) {
if (list.isEmpty()) {
suggestionService.getSuggestions(new Callback<List<Favorite>>() {
public void onSuccess(List<Favorite> list) {
UiUtils.submitOnUiThread(() -> {
list.stream()
.limit(5)
.forEach(uiList::show);
});
}
public void onError(Throwable error) { UiUtils.errorPopup(error); }}); }else {
list.stream()
.limit(5)
.forEach(favId -> favoriteService.getDetails(favId,
new Callback<Favorite>() {
public void onSuccess(Favorite details) {
UiUtils.submitOnUiThread(() -> uiList.show(details));
}
public void onError(Throwable error) { UiUtils.errorPopup(error); }})); }}public void onError(Throwable error) { UiUtils.errorPopup(error); }});Copy the code
In Reactor, the equivalent code looks much better:
userService.getFavorites(userId)
.flatMap(favoriteService::getDetails)
.switchIfEmpty(suggestionService.getSuggestions())
.take(5)
.publishOn(UiUtils.uiThreadScheduler())
.subscribe(uiList::show, UiUtils::errorPopup);
Copy the code
Future objects are a little better than callbacks, but despite the improvements Java 8 has made to CompletableFuture, they still do poorly in terms of composition. Orchestrating multiple Future objects together is possible, but not easy. In addition, Future has other problems:
A) It is easy to make a Future object kill the execution of another blocking task by calling get(). B) They do not support lazy computing. C) They lack support for multiple values and advanced error handling.Copy the code
Such as:
CompletableFuture<List<String>> ids = ifhIds();
CompletableFuture<List<String>> result = ids.thenComposeAsync(l -> {
Stream<CompletableFuture<String>> zip =
l.stream().map(i -> {
CompletableFuture<String> nameTask = ifhName(i);
CompletableFuture<Integer> statTask = ifhStat(i);
return nameTask.thenCombineAsync(statTask, (name, stat) -> "Name " + name + " has stats " + stat);
});
List<CompletableFuture<String>> combinationList = zip.collect(Collectors.toList());
CompletableFuture<String>[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]);
CompletableFuture<Void> allDone = CompletableFuture.allOf(combinationArray);
return allDone.thenApply(v -> combinationList.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
});
List<String> results = result.join();
assertThat(results).contains(
"Name NameJoe has stats 103"."Name NameBart has stats 104"."Name NameHenry has stats 105"."Name NameNicole has stats 106"."Name NameABSLAJNFOAJNFOANFANSF has stats 121");
Copy the code
From imperative to Reactive Programming: Reactive libraries such as Reactor aim to address these shortcomings of “classic” asynchronous approaches on the JVM, while also focusing on other aspects:
A) composability and readability. B) Manipulating the data stream with a rich set of Vocabulary of Operators. C) No action is taken before the subscription. D) Negative pressure or the ability of consumers to signal to producers that the push rate is too high. E) Concurrent agnostic high-level but high-value abstractions.Copy the code
Composability and readability:
Composability refers to the ability to coordinate multiple asynchronous tasks, in which we use the results of previous tasks as input to subsequent tasks. In addition, we can run multiple tasks in fork-join mode. In addition, we can reuse asynchronous tasks as discrete components in higher-level systems. The ability to orchestrate executable tasks is closely related to the readability and maintainability of code. As the number and complexity of asynchronous processing layers increases, it becomes increasingly difficult to be able to compose and read code. As we've seen, the callback model is simple, but one of its major drawbacks is that for complex processes, you need to execute a callback from one callback, which itself is nested within another callback, and so on. This mess is known as "callback hell." As you can guess (or learn from experience), it's hard to work with code like this one. Reactor provides a wealth of composition options, where the code reflects the organization of the abstract process and generally keeps everything on the same level (with minimal nesting).Copy the code
Like an assembly line:
You can think of the data processing of a reactive application as a pipeline. Reactor is both a conveyor belt and a workstation. Raw materials are shipped from the source (original Publisher), processed, and finally become finished products ready to be pushed to the Subscriber. Raw materials can go through various transformations and other intermediate steps, or they can become part of a larger assembly line that brings middleware together. If a point fails or becomes blocked (perhaps it takes too long to pack the product), the laborious workstation can signal upstream to reduce the push of raw materials.Copy the code
Operators:
At Reactor, operators are workstations in our production line. Each operator adds behavior to Publisher and wraps the Publisher from the previous step into the new instance. Thus, the entire chain is linked together so that data is emitted from the first publisher and moved down the chain and processed by each link. Eventually, the subscriber completes the process. Remember, nothing happens until the subscriber subscribes to the publisher, as we will soon see. Although the reactive flow specification does not specify operators at all, one of the best added values of reactive libraries (Reactor, for example) is that they provide a wealth of operators for us to use. They range from simple transformations and filtering to complex choreography and error handling.Copy the code
Nothing happens until you subscribe:
In Reactor, when you write a Publisher chain, you don't pump data into it by default. Instead, you can create an abstract description of an asynchronous process (which facilitates reuse and composition). By subscribing, you bind publishers to subscribers, triggering the flow of data throughout the chain. This is done internally through a single request signal from Subscriber that propagates upstream all the way back to the source publisher.Copy the code
Negative pressure (Backpressure) :
Signals propagating upstream are also used to achieve negative pressure, which we describe in the assembly line as feedback signals sent up the production line when the processing speed of the workstation is slower than that of the upstream workstation. The actual mechanism defined by the Reactive Streams specification is very close to this metaphor: a subscriber can work in unbounded mode and have the source push all data at the fastest possible rate, or a request mechanism can be used to signal the source that it is ready to process up to n elements. Intermediate operators can also change requests en route. Imagine a buffer operator that groups elements in groups of ten. If the subscriber requests a buffer, it is natural for the source to produce ten elements. Some operators also implement a prefetch strategy to avoid request(1) round-tripping, which can be beneficial if the cost of generating elements before the request is not too high.Copy the code
Hot and cold response:
Cold response: For each subscriber, including at the data source, the data push starts from scratch. For example, if the source wraps the HTTP call, a new HTTP request is issued for each subscription. Hot response: Not every subscriber can receive data from scratch. Subsequent subscribers will receive the pushed signal after subscribing. Note, however, that some thermal response flows can cache or re-enact all or part of the sent history information. From a general point of view, hot sequences will even send data (violating the "send nothing until you subscribe" rule) even if no subscriber is listening.Copy the code
Need to start
JDK8.0 +
Stream and functional programming
IO. Projectreactor package
Core features
Reactor introduced composable responsive types that implement Publisher but also provide a rich set of operators: Flux and Mono.
The Flux object represents 0.. N element response sequence, while Mono object represents single value or null (0.. 1) Sequence. This distinction contains some semantic information in the type, indicating a rough cardinality (base number) for asynchronous processing. For example, an HTTP request produces only one response, so counting doesn’t make much sense. Therefore, it makes more sense to represent the result of the HTTP call as Mono<HttpResponse> rather than Flux<HttpResponse> because it only provides operators relevant to zero items or the context of an item. Operators that change the processing maximum cardinality are also switched to the related type. For example, the count operator exists in Flux, but it returns Mono<Long>.
Flux
Flux<T> is the standard Publisher<T> (because it has at most one data), indicating that it is an asynchronous sequence that can send 0 to N elements, with optional termination operations either complete or fail. As with the Reactive Streams specification, these three signals are translated into calls to the onNext, onComplete, and onError methods of downstream subscribers. With such a wide range of possible signals, Flux is the universal responsive type. Note that all events, even termination events, are optional, meaning that there may be no onNext event, but only onComplete, which means that this Flux is an empty finite sequence. Deleting onComplete yields an infinite empty sequence (which is useless except to cancel the test). Similarly, an infinite sequence is not necessarily null. For example, flux.interval (Duration) produces Flux<Long>, which is infinite and a regular tick from the clock.
Mono
Mono<T> is a custom Publisher<T> that issues up to one entry and (optionally) terminates with an onComplete signal or an onError signal. It provides only a subset of operators available for Flux, and some operators (especially those who combine Mono with other Publishers) can switch Mono to Flux. For example, mono.concatwith (Publisher) returns Flux, while mono.then (Mono) returns another Mono. Note that you can use Mono to represent a no-value asynchronous procedure that has only the concept of completion (similar to Runnable). To create such a Mono, use an empty Mono<Void>.
Simply create Flux and Mono and subscribe
Flux and Mono were created basically using the factory approach:
A) For creating String Flux:
Flux<String> flux1 = Flux.just("qwer"."asdf"."zxcv");
List<String> list = Arrays.asList("rewq"."fdsa"."vcxz");
Flux<String> flux2 = Flux.fromIterable(list);
Copy the code
There is another way to create it:
Mono<String> empty = Mono.empty(); // Create an empty Mono, even if it is generic
Mono<String> mono = Mono.just("qwer"); // Create a String Mono
Flux<Integer> flux = Flux.range(0.5); // Create an incremented sequence of integers
Copy the code
To subscribe to
B) Flux and Mono have 5 variations of subscription method signatures:
// This method completes the subscription and fires the sequence
subscribe();
// Each value passed is processed
subscribe(Consumer<? super T> consumer);
// Process each value passed and handle exceptions in the process
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer);
// Handle the passed values and exceptions and do some action when the sequence is complete
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer);
// Handle incoming values, exceptions, completion events, and subscriptions generated by this subscription call.
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer,
Consumer<? super Subscription> subscriptionConsumer);
Copy the code
C) The return value of the above methods is Disposable, which has the ability to cancel subscription. Although the subscription can be cancelled by calling the Disposable dispose() method, if the publisher pushes too fast, it may push all the data before it is cancelled. Disposables. Swap () you can create a Disposable with automatic Disposables, or replace it with a Disposable. Disposables.composite(…) Allows integration of multiple disposables
Custom subscribers
Alternatives to lambda expressions: BaseSubscriber:
This class is more about helping to complete user – defined subscribers. By inheriting it. The minimum implementation requirements for user customization are the hookOnSubscribe(Subscription Subscription) and hookOnNext(T value) methods.
The first method tells you what the subscription should do when it’s called, and the second one makes the next data request call, and don’t forget to call the Request (int) method to make the data request. RequestUnbounded () methods are equivalent to Request (long.max_value)) to switch to an unbounded mode; The cancel() method is also provided. There are also methods such as hookOnComplete(), hookOnError(), hookOnCancel(), and hookFinally(), the latter of which provides an argument describing the termination type.
The simplest way to consume the original request is to subscribe using BaseSubscriber and override the hookOnSubscribe method, as shown in the following example:
Flux.range(1.10)
.doOnRequest(r -> System.out.println("request of " + r))
.subscribe(new BaseSubscriber<Integer>() {
@Override
public void hookOnSubscribe(Subscription subscription) {
request(1);
}
@Override
public void hookOnNext(Integer integer) {
System.out.println("Cancelling after having received "+ integer); cancel(); }});Copy the code
The output is as follows:
request of 1
Cancelling after having received 1
Copy the code
Note: When processing requests, you must be careful to generate enough demand to advance the sequence, or your Flux may “get stuck.” This is why BaseSubscriber defaults to unbounded requests in hookOnSubscribe. When overriding this method, the request(Request (1)) should normally be invoked at least once.
Methods for negative pressure and remodeling requests:
When negative pressure is implemented in Reactor, consumer pressure is propagated back to the source producer by sending requests to upstream operators. The sum of current requests is sometimes referred to as current “requirements” or “pending requests.” The upper limit of the requirement is long.max_value, which represents an unlimited request (meaning “generate as fast as possible” – meaning negative pressure will be disabled). The first request comes from the final subscriber at subscription time, but the most direct subscription all approach immediately triggers an unlimited request for long.max_value:
Subscription () and most lambda-based variants (except those with a Consumer< subscription > parameter). Block (), blockFirst() and blockLast(). Iterate through toIterable() or toStream().Copy the code
Operators modify requirements from downstream:
One thing to keep in mind is that each operator in the upstream chain can adjust the requirements expressed at the subscription level.
The typical example is the Buffer (N) operator: if it receives a request(2), it interprets it as a request for two full buffers. As a result, since the buffer needs to treat N elements as full, the buffer operator reshapes the request to 2xN.
You may also have noticed that some operator variants take an int input parameter called prefetch. This is another type of operator that modifies downstream requests. These are typically operators working with internal sequences that derive publishers from each incoming element, such as flatMap.
Prefetching is a way to tune the initial requests made to these internal sequences. If not specified, most of these operators default to 32. These operators also typically implement complementary optimizations: once the operator sees that 75% of the prefetch requests are satisfied, it re-requests 75% from upstream. Heuristic optimizations are performed so that these operators proactively anticipate upcoming requests.
Finally, several operators allow you to adjust requests directly: limitRate and limitRequest.
a) limitRate(N) splits the downstream requests so that they are propagated upstream in smaller batches. (N means that each request is divided into M/N for upstream requests, and the maximum number of requests is <=N). For example, tolimitThe 100 requests issued by Rate(10) will result in a maximum of 10 requests of 10 being propagated upstream. Notice, in this form,limitRate actually implements the complementary optimizations discussed earlier. B) The operator has a variant that also allows you to adjust the replenishment amount (called lowTide in the variant) :limitRate (highTide, lowTide). (P.s. personal understanding, number of requests >=lowTide and <=highTide) Choosing lowTide 0 results in a strict lowTide request batch, rather than remarking branch batches with a supplementary strategy (request batches must be strictly highTide times). C) On the other hand,limitRequest(N) limits downstream requests to the maximum total number of requests. It totals the requests to N. If a single request does not raise the total requirement above N, that particular request is fully propagated upstream. After the source sends that number of replies,limitRequest considers the sequence complete, sends onComplete downstream, and then cancels the source.Copy the code
Programmatically create a sequence
We introduce the creation of Flux or Mono by programmatically defining its associated events (onNext, onError, and onComplete). All of these methods are based on the fact that they expose an API to trigger events we call sinks. There are actually some receiver variants, which we’ll cover later.
Synchronous: the generate:
The simplest form of creating Flux programmatically is through the generate method, which comes with a generator function. This is used for synchronous push and push by push, which means that the sink is SynchronousSink and its next() method can only be called at most once per callback. You can then call Error (Throwable) or complete() separately, but this is optional. Perhaps the most useful variant is one that allows you to keep a state of reference in your sink usage to determine what to push next. The generator function then becomes BiFunction<S, SynchronousSink<T>, S>, and the state object is of type <S>. You must provide Supplier<S> for the initial state, and your generator function now returns the new state every turn.
For example, you can use int as a state:
Flux<String> flux = Flux.generate(
() -> 0.// The initial value is 0
(state, sink) -> { / / : Supplier < S > function
sink.next("3 x " + state + "=" + 3*state); // Add elements to Flux by calling next() on the connection slot
if (state == 10) sink.complete();
return state + 1;
});
Copy the code
You can also use mutable <S>. For example, you could rewrite the above example using a single AtomicLong as a state, changing it each turn:
Flux<String> flux = Flux.generate(
AtomicLong::new,
(state, sink) -> {
long i = state.getAndIncrement();
sink.next("3 x " + i + "=" + 3*i); // The next() method adds elements to Flux
if (i == 10) sink.complete();
return state;
});
Copy the code
If your state object needs to clean up some resources, use the generate(Supplier<S>, BiFunction<S, SynchronousSink<T>, S>, Consumer<S>) variant to clean up the last state instance.
The following example uses the generate method that contains Consumer<S> :
Flux<String> flux = Flux.generate(
AtomicLong::new,
(state, sink) -> {
long i = state.getAndIncrement();
sink.next("3 x " + i + "=" + 3*i);
if (i == 10) sink.complete();
return state;
}, (state) -> System.out.println("state: " + state));
}
Copy the code
We treat the last state value (11) as the output of the Consumer lambda. In a scenario where a database connection or resource is required at the end, the consumer lambda can close the connection or otherwise handle any tasks that should be completed at the end of the process.
Asynchronous and multithreaded :create:
Create is a more advanced form of programmatic creation of Flux and is suitable for pushing data multiple times per round, even from multiple threads.
It exposes a FluxSink and its next(), error(), and complete() methods. In contrast to generate, it has no state-based variant. On the other hand, it can trigger multithreaded events in callbacks.
Note: Create does not parallelize your code and does not make it asynchronous, even though it can be used with asynchronous apis. If you block in the CREATE lambda, you can trap yourself in deadlocks and similar side effects.
Even when SubscribeOn is used, note that a create lambda that blocks for a long time (for example, an infinite loop calling sink.next(t)) can lock the pipe:
Since the loop keeps the same thread hungry, the request will never be executed (the request will also be executed on this thread). Using a subscriptionOn(Scheduler, false) variant: requestOnSeparateThread = false will create using the Scheduler thread and still allow data to flow by executing the request in the original thread.
In addition, because create Bridges asynchronous apis and manages negative pressure, you can optimize how to perform negative pressure behavior by instructing the OverflowStrategy:
A) IGNORE Completely ignores downstream negative pressure requests. This can cause an IllegalStateException when the queue is full downstream. B) ERROR, an ERROR signal is emitted to indicate an IllegalStateException when the downstream cannot keep up. C) DROP. If the downstream is not ready to receive data, the signal sent by the upstream is discarded. D) LATEST, let the downstream only get the LATEST signal from the upstream. E) BUFFER. If downstream cannot keep up, BUFFER all signals. (This will buffer indefinitely and may cause an OutOfMemoryError).Copy the code
Mono also has the Create generator. MonoSink created by Mono is not allowed to send data more than once. It will discard all data after the first one.
Asynchronous single thread :push:
Push is a compromise between generate and create and is suitable for handling events from a single producer. In a sense, it is similar to create in that it can also be asynchronous and can manage negative pressure using any overflow policies that create supports. However, only one production thread at a time can call next(), complete(), or error().
Flux<String> bridge = Flux.push(sink -> {
myEventProcessor.register(
new SingleThreadEventListener<String>() {
public void onDataChunk(List<String> chunk) {
for(String s : chunk) { sink.next(s); }}public void processComplete(a) {
sink.complete();
}
public void processError(Throwable e) { sink.error(e); }}); });Copy the code
A hybrid pull/push mode:
Most Reactor operators, such as CREATE, follow a mixed push/pull model.
What we mean by that is that while most processing is asynchronous (a push approach is recommended), there is a small pull component: the request.
The consumer pulling data from the data source means that the publisher does not issue anything until the first request is made. The source pushes the data to the consumer as long as it is available, but within the number of requests it makes.
Note that both push() and create() allow you to set up onRequest consumers to manage the volume of requests and ensure that data is pushed through the connection slot only when requests are pending. Two callbacks, onDispose and onCancel, perform the cleanup on cancellation or termination.
When Flux completes, an error occurs, or is cancelled, onDispose can be used to clean up. OnCancel can be used to perform any cancelation-specific operation before onDispose cleans up.
Flux<String> bridge = Flux.create(sink -> {
sink.onRequest(n -> channel.poll(n))
.onCancel(() -> channel.cancel())
.onDispose(() -> channel.close())
});
Copy the code
Handle (processing) :
The Handle method is a bit different: it is an instance method, which means it is linked to an existing source (as are common manipulators). It exists in Mono and Flux. In a sense, it is more like using SynchronousSink to generate sequences and only allows data to be pushed one by one. However, you can use Handle to generate arbitrary values from each source element, possibly skipping some elements. In this way, it can be used as a combination of map and filter.
Handle’s signature is as follows:
Flux<R> handle(BiConsumer<T, SynchronousSink<R>>);
Copy the code
Let’s consider an example. The reactive flow specification does not allow null values in a sequence. What if you want to execute a map but want to use a pre-existing method as a map function that sometimes returns NULL? For example, the following methods can safely be applied to integer sources:
public String alphabet(int letterNumber) {
if (letterNumber < 1 || letterNumber > 26) {
return null;
}
int letterIndexAscii = 'A' + letterNumber - 1;
return "" + (char) letterIndexAscii;
}
Copy the code
We can then use handle to remove any null values:
Flux<String> alphabet = Flux.just(-1.30.13.9.20)
.handle((i, sink) -> {
String letter = alphabet(i);
if(letter ! =null)
sink.next(letter);
});
alphabet.subscribe(System.out::println);
Copy the code
Thread and program scheduler
Like RxJava, Reactor can be considered concurrency independent. That is, it does not enforce the concurrency model. Instead, it puts you (the developer) in command mode. There is no library to help you with concurrency. Getting Flux or Mono does not necessarily mean it is running in a dedicated thread. Instead, most operators continue to work in the thread that executed the previous operator. Unless specified, the topmost operator (source) itself runs on the thread that makes the SUBSCRIBE () call.
The following example runs Mono in a new thread:
public static void main(String[] args) throws InterruptedException {
final Mono<String> mono = Mono.just("hello "); // Mono
completes the assembly in the main thread
Thread t = new Thread(() -> mono
.map(msg -> msg + "thread ")
.subscribe(v -> // Complete the subscription in a new thread
System.out.println(v + Thread.currentThread().getName()) // As a result, map and filter are executed on the new thread
)
)
t.start();
t.join();
}
Copy the code
In Reactor, the execution model and the location of the execution are determined by the scheduler used. The scheduler has scheduling responsibilities similar to ExecutorService, but has dedicated abstractions that allow it to do more, especially acting as a clock and enabling a wider range of implementations (virtual time for testing, trampolines, or just-in-time scheduling, etc.). The Schedulers class has the following static methods that access the execution context:
A) No execution context (schedulers.immediate ()): The submitted Runnable is executed directly while processing, thus running them efficiently on the current thread (which can be considered "empty objects" or no-action Schedulers). B) A single reusable thread (schedulers.single ()). Note that this method uses the same thread for all callers until the scheduler is released. If you need to call a dedicated thread at a time, use schedulers.newsingle () for each call. C) Unlimited flexible thread pools (schedulers.elastic ()). As Schedulers. BoundedElastic () the introduction of the Schedulers. BoundedElastic () is no longer the preferred method, because it is a tend to hide the back pressure and lead to too much thread (see below). D) bounded elastic thread pool (Schedulers. BoundedElastic ()). Like its predecessor Elastic (), it creates new work pools as needed and reuses free ones. Working pools that have been idle for too long (default: 60s) will also be discarded. Unlike its predecessor Elastic (), it places a limit on the number of threads that can be created (the default is CPU cores x 10). When the limit is reached, up to 100,000 more tasks can be submitted and rescheduled when threads are available (when a task is set to delay execution, delay timing starts when threads are available). This is a better choice for I/O blocking tasks. Schedulers. BoundedElastic () is a kind of handle and a simple method to provide their own thread for block, so that it would not take up other resources. E) Fixed worker thread pools adjusted for parallel work (schedulers.parallel ()). It creates as many worker threads as there are CPU cores.Copy the code
In addition, you can use Schedulers. FromExecutorService (ExecutorService) from any pre-existing ExecutorService create a Scheduler. (Although this is discouraged, you can also create one from the executor.) You can also create new instances of various scheduler types using the newXXX method. For example, schedulers.newparallel (yourScheduleName) creates a newParallel scheduler called yourScheduleName. While boundedElastic can help with traditional blocking code that cannot be avoided, single and Parallel do not. Use the Reactor blocking API (block(), blockFirst(), BlockLast () (and iterating through toIterable() or toStream() in the default single and parallel schedulers) causes IllegalStateException to be raised. By creating a scheduler for Thread instances that implement the NonBlocking tag interface, it can only be “non-blocking only.” (Custom schedulers that implement the NonBlocking interface cannot block execution, otherwise an exception will be reported.) By default, some operators use a specific scheduler in Schedulers (you are usually given the option to replace it with another scheduler). For example, calling the flux.interval (duration.ofmillis (300)) factory method will generate a Flux that ticks every 300ms. By default, this function is performed by schedulers.parallel ().
The following changes Scheduler to a new instance of schedulers.single () that performs a similar function:
Flux.interval(Duration.ofMillis(300), Schedulers.newSingle("test"));
Copy the code
Reactor provides two ways to switch execution context (or Scheduler) in a reactive chain:
PublishOn and subscribeOnCopy the code
Both use a scheduler and allow you to switch the execution context to that scheduler.
But the position of publishOn in the chain is important, and the position of subscribeOn is not. To understand this difference, you must first remember that nothing happens until you subscribe. In Reactor, when linking operations, you can wrap as many Flux and Mono implementations together as you need. Once subscribed, a chain of subscriber objects is created, going backwards (up) to the first publisher. This is actually hidden from you. You only see the outer layers of Flux (or Mono) and Subscription, but these intermediate operator-specific subscribers are the components that actually perform the task.
PublishOn method:
PublishOn transforms the operators behind it into the new executable context. PublishOn runs in the middle of the subscriber chain in the same way as any other operator. It takes signals upstream and releases them downstream, while performing callbacks on the worker of the associated Scheduler. Therefore, it affects the execution location of subsequent operators (until another publishOn is linked) as follows:
A) Change the execution context to a thread selected by the scheduler. B) According to the specification, onNext calls occur sequentially and thus run out of one thread. C) Operators after publishOn will continue to execute on the same thread unless they work on a specific Scheduler.Copy the code
The following example uses the publishOn method:
Scheduler s = Schedulers.newParallel("parallel-scheduler".4); // Create a new scheduler supported by four thread instances. (1)
final Flux<String> flux = Flux
.range(1.2)
.map(i -> 10 + i) // The first mapping runs on an anonymous thread in (5).
.publishOn(s) // publishOn switches the entire sequence to Thread (1).
.map(i -> "value " + i); // The second mapping runs on the thread of (1).
new Thread(() -> flux.subscribe(System.out::println)); // This anonymous thread is the subscribing thread. Printing takes place in the latest execution context, which is what is in publishOn. (5)
Copy the code
SubscribeOn method:
SubscribeOn applies to the subscription process when the subsequent chain is constructed. Therefore, no matter where you place subscribeOn in the chain, it will always affect the context in which the source is emitted. However, this does not affect the behavior of subsequent calls to publishOn – they still switch the execution context for that part of the chain after publishOn.
SubscribeOn role:
A) change the thread on which the source publisher subscribing to the entire operator chain is located b) select a thread from the schedulerCopy the code
Note: Only the earliest SubscribeOn call in the chain is really considered. The following example uses the subscribeOn method:
Scheduler s = Schedulers.newParallel("parallel-scheduler".4); Create a parallel thread supported by 4 threads
final Flux<String> flux = Flux
.range(1.2)
.map(i -> 10 + i) // This map runs on one of four threads
.subscribeOn(s) // Switch the initial subscription thread at subscription time
.map(i -> "value " + i); // This map is on the same thread as the previous operator
new Thread(() -> flux.subscribe(System.out::println)); // The initialization is supposed to be done in this anonymous thread, but is switched to one of the four threads because of subscribeOn.
Copy the code
Handling errors
Exceptions propagate down the chain until they reach the subscriber’s onError() method, so the onError() method should be preprocessed or overwritten
P.s. Reactor actually has a lot of official exception processing, I am lazy, so I just read the first suggestion.
The processor
The processor acts as both parent, publisher and subscriber.
It is not officially recommended because it is difficult to use and suitable for narrow scenarios. Instead of using the Reactor processor directly, it is a good practice to call sink() once to get the connection slot of the processor.
B: well… Skip it for now and look at it later.
test
Update in the…
Debugging Reactor
Update in the…
Monitoring Reactor index parameters
Update in the…
Advanced features and definitions
Update in the…
And then attached to some of their own code
import org.reactivestreams.Subscription;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.*;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.context.Context;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Flow;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
/ * * *@author SuanCaiYv
* @time2020/3/11 12:21 am */
public class Main
{
public static void main(String[] args)
{
Flux<String> fluxStr1 = Flux.just("Hello"."Reactor"."And"."Me"); // Push it in
List<String> listTemp = Arrays.asList("I"."am"."a"."list");
Flux<String> fluxStr2 = Flux.fromIterable(listTemp); // Created from an iterable type
Mono<String> monoStr1 = Mono.empty(); // Create a null time regardless of the generic type
Mono<String> monoStr2 = Mono.just("text");
Flux<Integer> fluxInt1 = Flux.range(2.5); // A Flux containing 2, 3, 4, 5, 6
Other common static methods are fromArray() * fromStream() * merge() * zip() */
/* * Define generator */
BiFunction<Integer, SynchronousSink<String>, Integer> biFunction1 = new BiFunction<Integer, SynchronousSink<String>, Integer>()
{
@Override
public Integer apply(Integer integer, SynchronousSink<String> stringSynchronousSink)
{
stringSynchronousSink.next("No. "+integer); // Call the next() method of "connection slot" to insert values
if (integer > 5) {
stringSynchronousSink.complete();
}
return integer+1; }};/* * initialize with */
Callable<Integer> callable1 = new Callable<Integer>()
{
@Override
public Integer call(a) throws Exception
{
return 0; }};Generate () is generate(Consumer< T>> generator);
Flux<String> fluxStr3 = Flux.generate(callable1, biFunction1);
Consumer<Integer> consumer3 = t -> {
System.out.println("No. "+t+"is done.");
};
The Consumer3 takes care of cleaning up after calling onComplete()
Flux<String> fluxStr4 = Flux.generate(callable1, biFunction1, consumer3);
Consumer<FluxSink<String>> consumer5 = t -> {
for (int i = 0; i < 5; ++ i) {
t.next("No. "+i);
}
t.complete();
};
// Asynchronous multithreading creates a sequence
Flux<String> fluxStr5 = Flux.create(consumer5, FluxSink.OverflowStrategy.BUFFER);
fluxStr5.subscribe(System.out::println);
// It differs from create in that it is created in a single thread
Flux<String> fluxStr6 = Flux.push(consumer5, FluxSink.OverflowStrategy.BUFFER);
BiConsumer<Integer, SynchronousSink<Integer>> biConsumer2 = new BiConsumer<Integer, SynchronousSink<Integer>>()
{
@Override
public void accept(Integer integer, SynchronousSink<Integer> integerSynchronousSink)
{
if (integer > 5) {
return; } integerSynchronousSink.next(integer); }};// Handle is an instance method that can filter and transform, so it can also generate a sequence
Flux<Integer> fluxInt2 = fluxInt1.handle(biConsumer2);
fluxInt2.subscribe(System.out::println);
fluxStr3.subscribe(newMyBaseSubscriber()); }}class MyBaseSubscriber extends BaseSubscriber<String>
{
@Override
protected void hookOnSubscribe(Subscription subscription)
{
System.out.println("I'm going to subscribe!");
request(1); // You must request at least one, otherwise it will get stuck
}
@Override
protected void hookOnNext(String value)
{
System.out.println("Hey! Look what I got? "+value);
request(1); }}Copy the code