Introduction to the
Reactor is a very important member of the ReactiveX family. Reactor is the fourth generation reactive Library built on the Reactive Streams standard to build non-blocking applications under the JVM.
I’m going to introduce Reactor today.
Introduction of Reactor
Reactor is a JV-based non-blocking API that integrates directly with apis in JDK8, such as CompletableFuture, Stream, and Duration.
It provides two very useful asynchronous sequence apis, Flux and Mono, and implements the Reactive Streams standard.
It can also be combined with reactor-Netty as an underlying service for asynchronous frameworks such as WebFlux introduced in the familiar Spring MVC 5.
We know that the bottom layer of WebFlux uses reactor-Netty, which in turn references a reactor. So, if you introduce webFlux dependencies in POM:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
Copy the code
The project will be automatically introduced into Reactor.
If you’re not using Spring WebFlux, you can add the following dependencies directly to use a Reactor:
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
Copy the code
Reactive Programming history
At the very beginning Microsoft was. The Reactive Extensions (Rx) library is created on the.NET platform. RxJava then implements the JVM platform Reactive.
Then came the Reactive Streams standard, which defines the specifications that the Java platform must meet. It is already integrated into the java.util.concurrent class in JDK9.
Four very important components that implement Reactive Streams are defined in Flow: Publisher, Subscriber, Subscription, and Processor.
Iterable-iterator and publisher-subscriber
Reactive is typically used in object-oriented programming languages as an extension of the observer pattern.
Let’s look at the implementation of the observer mode in detail, taking Publisher and Subscriber as examples:
public static interface Publisher<T> {
public void subscribe(Subscriber<? super T> subscriber);
}
Copy the code
public static interface Subscriber<T> {
public void onSubscribe(Subscription subscription);
public void onNext(T item);
public void onError(Throwable throwable);
public void onComplete(a);
}
Copy the code
There are two interfaces defined above, Publisher and Subscriber. The function of Publisher is to subscribe to Subscriber.
Subscriber defines four on methods to trigger specific events.
Then how does subscribe in Publisher trigger onSubscribe event of Subscriber?
Very simply, let’s look at a concrete implementation:
public void subscribe(Flow.Subscriber<? super T> subscriber) {
Subscription sub;
if(throwable ! =null) {
assert iterable == null : "non-null iterable: " + iterable;
sub = new Subscription(subscriber, null, throwable);
} else {
assert throwable == null : "non-null exception: " + throwable;
sub = new Subscription(subscriber, iterable.iterator(), null);
}
subscriber.onSubscribe(sub);
if(throwable ! =null) { sub.pullScheduler.runOrSchedule(); }}Copy the code
The example above is the subscribe implementation of PullPublisher. We can see that the subscriber.onsubscribe method is triggered in this subscribe. And that’s the secret of the observer model.
In other words, when Publisher calls SUBSCRIBE, it is the onSubscribe method of the active push subscriber.
Iterable-iterator is an active pull mode that requires repeated calls to the next() method. So its control is with the caller.
Why use asynchronous Reactive
In modern applications, as the number of users increases, programmers need to consider how to increase the processing power of the system.
The traditional block IO approach is not suitable for this scenario because it requires a lot of resources. We need no-block IO.
The JDK provides two models for asynchronous programming:
The first is Callbacks, where asynchronous methods can perform asynchronous tasks in a Callback by passing in a Callback argument. A typical example is EventListener in Java Swing.
The second is to use the Future. We use Callable to submit a task, and then use the Future to get its results.
What’s wrong with these two types of asynchronous programming?
The problem with callback is callback hell. Those familiar with JS should understand the concept of callback hell.
To put it simply, callback hell is the use of callback in callback, resulting in a hierarchy of callbacks.
A Future, on the other hand, fetches the result of an asynchronous execution, and its get() is actually a block operation. There is no support for exception handling and no support for delayed calculation.
What happens when you have a combination of multiple futures? JDK8 actually introduces a class called the CompletableFuture, which is also a CompletionStage, and the CompletableFuture supports cascading operations of THEN. The methods CompletableFuture offers aren’t that rich, though, and may not meet my needs.
So our Reactor came along.
Flux
Reactor provides two very useful operations, Flux and Mono. Where Flux represents 0 to N responsive sequences, while Mono represents 0 or 1 responsive sequences.
Let’s see how a Flux transfers items:
Let’s look at the definition of Flux:
public abstract class Flux<T> implements Publisher<T>
Copy the code
As you can see, Flux is essentially a Publisher that produces asynchronous sequences.
Flux provides a number of useful methods for processing these sequences and provides signal notifications of completions and errors.
The corresponding onNext, onComplete and onError methods of Subscriber will be called.
Mono
Let’s see how Mono transfers items:
Take a look at Mono’s definition:
public abstract class Mono<T> implements Publisher<T>
Copy the code
Mono, like Flux, is a Publisher and is used to produce asynchronous sequences.
Mono only has 0 or 1 sequence, so only onComplete and onError methods of Subscriber will be triggered without onNext.
Mono, on the other hand, can be seen as a subset of Flux, containing only some of the functions of Flux.
Mono and Flux are interchangeable, such as Mono#concatWith(Publisher) returns a Flux, and Mono#then(Mono) returns a Mono.
Basic operation of Flux and Mono
Let’s look at an example of Flux creation:
Flux<String> seq1 = Flux.just("foo"."bar"."foobar");
List<String> iterable = Arrays.asList("foo"."bar"."foobar");
Flux<String> seq2 = Flux.fromIterable(iterable);
Flux<Integer> numbersFromFiveToSeven = Flux.range(5.3);
Copy the code
As you can see, Flux provides a variety of ways to create, and we can choose from them.
Now look at Flux’s subscribe method:
Disposable subscribe(a);
Disposable subscribe(Consumer<? super T> consumer);
Disposable subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer);
Disposable subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer);
Disposable subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer,
Consumer<? super Subscription> subscriptionConsumer);
Copy the code
Subscribe can have no arguments, or up to four arguments.
Let’s look at the case with no parameters:
Flux<Integer> numbersFromFiveToSeven = Flux.range(5.3);
numbersFromFiveToSeven.subscribe();
Copy the code
Note that the absence of parameters does not mean that Flux’s objects are not consumed, just invisible.
Let’s look at the case with parameters: Consumer handles on each, errorConsumer handles on Error, completeConsumer handles on complete, The subscriptionConsumer handles the on Subscribe event.
The first three parameters are easy to understand. Let’s take an example:
Flux<Integer> ints3 = Flux.range(1.4);
ints3.subscribe(System.out::println,
error -> System.err.println("Error " + error),
() -> System.out.println("Done"),
sub -> sub.request(2));
Copy the code
We built four integer Flux from 1 to 4, on each is to print out, if there is an Error in the middle, print out Error, and print out Done when all finished.
So what does the last subscript consumer do?
SubscriptionConsumer Accept is a Subscription object.
public interface Subscription {
public void request(long n);
public void cancel(a);
}
Copy the code
Subscription defines two initialization methods, which call Request (n) to determine the maximum number of subscribe elements.
For example, in our example above, we built four integers, but only two of them were output.
All of these subscribe methods return a Disposable, and we can dispose of this subscribe by Disposable.
Disposable defines only two methods:
public interface Disposable {
void dispose(a);
default boolean isDisposed(a) {
return false;
}
Copy the code
The principle of Dispose is to send a signal to Flux or Mono to stop generating new objects, but it cannot guarantee that the object generation will stop immediately.
Now that we have Disposable, we must introduce to the Disposables of its tool class.
Disposables. Swap () can create a Disposable to replace or cancel an existing Disposable.
Disposables.com the email.composite (…). Multiple Disposable can be combined for unified processing in the future.
conclusion
This article introduced the fundamentals of the Reactor and its two very important components Flux and Mono. In the next article we will continue to introduce some more advanced uses of the Reactor Core. Stay tuned.
The example of this article is learn-Reactive
Author: Flydean program stuff
This paper links: www.flydean.com/introductio…
Source: Flydean’s blog
Welcome to pay attention to my public number: “procedures those things” the most popular interpretation, the most profound dry goods, the most concise tutorial, many you do not know the small skills you find!