What is responsive programming?
In computing, responsive or reactive programming is a declarative programming paradigm for data flow and change propagation. This means that static or dynamic data flows can be easily expressed in a programming language, and the associated computational model automatically propagates the changing values through the data flows.
The above quote is from Wikipedia.
Responsive programming, as its name suggests, is about responding. We need to respond when something happens.
Is our real life well explain the response type, most of our human actions are based on the event-driven model, when someone call your name, you will be judged by the event do you want to reply, this process is actually generate events, then we as consumers to deal with events, and we will also continue to pass down the results.
In reactive programming, asynchronous callback is usually used, and callback method invocation and control will be completed by reactive framework. For application development, only the implementation of callback method needs to be concerned.
Here’s a famous design principle: The Hollywood Principle.
Don’t call us, we will call you.
After the actors submit their resumes, just go home and wait. The acting company will call you.
Ii. Introduction to Project Reactor
The earliest Reactor library in Java, RxJava, was borrowed from. .net Reactor Extensions, and later the Jdk in Java9 provides standardized response type library implementation of Java. Util.. Concurrent Flow, then the Project Reactor as the fourth generation of reactive programming framework, It is a cornerstone of completely non-blocking responsive programming that directly integrates with Java functional apis, especially CompletableFuture, Stream, and Duration. Reactor Netty implements non-blocking inter-process communication and improves inter-service communication efficiency.
In our normal development, asynchronous programming is nothing more than using the utility classes in the JUC package or some Java synchronization semantics.
- Blocking wait: e.g. Future.get()
- Insecure data access: e.g. Reentrantlock.lock ()
- Abnormal bubble: for example, try… The catch… finally
- Synchronized {}
- Wrapper allocation (GC pressure) : e.g. New Wrapper(event)
Or custom thread pools, but also encounter problems like the following.
- Callable allocation — may result in GC stress.
- Synchronization forces each thread to perform a stop-check operation.
- Messages may be consumed more slowly than they are produced.
- Using threadpools to pass tasks to target threads — there is definitely GC stress in FutureTask.
- Block until I/O callback.
In future. get, we cannot avoid blocking and waiting. In the worst case, the application runs synchronously. Using a Reactor not only effectively addresses the above problems, but also allows us to write more concise code.
3. Reactor core concept
Code: github.com/CasterWx/re…
Flux
Flux represents an asynchronous sequence of 0 to N elements. There are three different types of message notifications that can be contained in this sequence: normal messages containing elements, end-of-sequence messages, and sequence error messages. The corresponding subscriber methods onNext(), onComplete(), and onError() are called when message notifications are generated.
1. just()
You can specify all elements contained in a sequence. The created Flux sequence ends automatically after these elements are published.
Flux.just("hello"."world")
.doOnNext((i) -> {
System.out.println("[doOnNext] " + i);
})
.doOnComplete(() -> System.out.println("[doOnComplete]"))
.subscribe(i -> System.out.println("[subscribe] " + i));
// Execution result
[doOnNext] hello
[subscribe] hello
[doOnNext] world
[subscribe] world
[doOnComplete]
Copy the code
2. FromArray (), fromIterable() and fromStream()
A Flux object can be created from an array, an Iterable, or a Stream object.
List<String> arr = Arrays.asList("flux"."mono"."reactor"."core");
Flux.fromIterable(arr)
.doOnNext((i) -> {
System.out.println("[doOnNext] " + i);
})
.subscribe((i) -> {
System.out.println("[subscribe] " + i);
});
// Execution result[doOnNext] flux [subscribe] flux [doOnNext] mono [subscribe] mono [doOnNext] reactor [subscribe] reactor [doOnNext] core [subscribe] coreCopy the code
3. empty()
Create a sequence that publishes the end message without any elements.
Flux.empty()
.doOnNext(i -> {
System.out.println("[doOnNext] " + i);
}).doOnComplete(() -> {
System.out.println("[DoOnComplete] ");
}).subscribe(i -> {
System.out.println("[subscribe] " + i);
});
// Execution result
[DoOnComplete]
Copy the code
4. error(Throwable error)
Create a sequence that contains only error messages.
try {
int []arr = new int[5];
arr[10] = 2;
} catch (Exception e) {
Flux.error(e).subscribe(i -> {
System.out.println("error subscribe");
});
}
// Execution result
Copy the code
5.never () : Creates a sequence that does not contain any message notifications.
Flux.never()
.doOnNext(i -> {
System.out.println("doOnNext " + i);
}).doOnComplete(() -> {
System.out.println("doOnComplete");
}).subscribe((i) -> {
System.out.println("subscribe " + i);
});
// Execution result
空
Copy the code
6. range(int start, int count)
Create a sequence of Integer objects containing count numbers starting from start.
Flux.range(5.10)
.doOnNext(i -> {
System.out.println("doOnNext " + i);
}).doOnComplete(() -> {
System.out.println("doOnComplete");
}).subscribe((i) -> {
System.out.println("subscribe " + i);
});
// Execution result
doOnNext 5
subscribe 5
doOnNext 6
subscribe 6
doOnNext 7
subscribe 7
doOnNext 8
subscribe 8
doOnNext 9
subscribe 9
doOnNext 10
subscribe 10
doOnNext 11
subscribe 11
doOnNext 12
subscribe 12
doOnNext 13
subscribe 13
doOnNext 14
subscribe 14
doOnComplete
Copy the code
Interval (Duration period) and interval(Duration delay, Duration period)
Create a sequence of Long objects incrementing from 0. The elements contained therein are published at specified intervals. In addition to the interval, you can specify a delay before the initial element is published.
Flux.interval(Duration.ofSeconds(4), Duration.ofSeconds(2))
.doOnNext(i -> {
System.out.println("doOnNext " + i);
}).doOnComplete(() -> {
System.out.println("doOnComplete " + new Date());
}).subscribe((i) -> {
System.out.println("subscribe " + i + ", date: " + new Date());
});
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// Execution result
doOnNext 0
subscribe 0, date: Fri Jun 25 10:17:56 CST 2021
doOnNext 1
subscribe 1, date: Fri Jun 25 10:17:58 CST 2021
doOnNext 2
subscribe 2, date: Fri Jun 25 10:18:00 CST 2021
doOnNext 3
subscribe 3, date: Fri Jun 25 10:18:02 CST 2021
Copy the code
Why did the above example not output doOnComplete? Since the fourth second, one element is produced every two seconds. By the time the final complete is completed, it is ten seconds before sleep, and the main thread has been released.
8. IntervalMillis (long period) and intervalMillis(long delay, long period)
This is the same as the interval() method, except that the interval and delay are specified in milliseconds.
Mono
Mono represents an asynchronous sequence of 0 or 1 elements. This sequence can also contain the same three types of message notifications as Flux.
1. FromCallable (), fromCompletionStage(), fromFuture(), fromRunnable(), and fromSupplier()
Create Mono from Callable, CompletionStage, CompletableFuture, Runnable, and Supplier, respectively.
Mono.fromCallable(() -> {
System.out.println("begin callable");
return "Hello";
})
.subscribeOn(Schedulers.elastic())
.doOnNext((i) -> System.out.println("doOnNext " + i + ", thread :" + Thread.currentThread().getName()))
.subscribe(System.out::println);
Thread.sleep(10000);
// Execution result
begin callable
doOnNext Hello, thread :elastic-2
Hello
Copy the code
Mono.fromFuture(CompletableFuture.supplyAsync(() -> {
System.out.println("begin");
return "hello";
}))
.subscribeOn(Schedulers.elastic())
.doOnNext((i) -> System.out.println("doOnNext " + i + ", thread :" + Thread.currentThread().getName()))
.subscribe(System.out::println);
Thread.sleep(10000);
// Execution result
begin
doOnNext hello, thread :elastic-2
hello
Copy the code
Delay (Duration Duration) and delayMillis(long Duration)
Creates a Mono sequence that produces the number 0 as a unique value after the specified delay time.
Mono.delay(Duration.ofSeconds(1)).subscribe(System.out::println);
Thread.sleep(3000);
// The execution result is printed after a delay of one second
0
Copy the code
3. ignoreElements(Publisher source)
Create a Mono sequence that ignores all elements in Publisher as the source, producing only the end message.
Mono.ignoreElements((i) -> {
System.out.println("ignoreElements");
})
.doOnNext((i) -> System.out.println("doOnNext " + i))
.subscribe(System.out::println);
// Execution result
ignoreElements
Copy the code
4. justOrEmpty(Optional<? Extends T> data) and justOrEmpty(T data)
Create Mono from an Optional object or possibly null object. The Mono sequence produces the corresponding element only if the Optional object contains a value or if the object is not null.
Optional<Integer> optional = Optional.empty();
Mono.justOrEmpty(optional)
.doOnNext((i) -> System.out.println("doOnNext " + i))
.subscribe(System.out::println);
System.out.println("= = = = = = = =");
optional = Optional.of(100);
Mono.justOrEmpty(optional)
.doOnNext((i) -> System.out.println("doOnNext " + i))
.subscribe(System.out::println);
// Execution result
========
doOnNext 100
100
Copy the code
The operator
1. The buffer and bufferTimeout
These two operators collect elements from the current flow into the collection and treat the collection object as a new element in the flow. You can specify different conditions when you do a collection: the maximum number of elements to include or the time interval for a collection. The method buffer() uses only one condition, whereas bufferTimeout() can specify both conditions. You can specify the Duration object or the number of milliseconds, using either the bufferMillis() or bufferTimeoutMillis() methods.
In addition to the number of elements and the time interval, collection can be done with the bufferUntil and bufferWhile operators. The parameters of these two operators are Predicate objects that represent conditions to be satisfied by the elements in each set. BufferUntil will collect until the Predicate returns true. The element that makes Predicate return true can optionally be added to the current collection or the next collection; BufferWhile is only collected if Predicate returns true. Once the value is false, the next collection begins immediately.
Flux.range(1, 100).buffer(20).subscribe(System.out::println); // Execution result [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20] 31, 32, 33, 34, 35, 36, 37, 39, 40] [41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59] [61, 62, 63, 64, 65, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80] [81, 82, 83, 84, 85, 86, 87, 88] 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100]Copy the code
2. filter
The elements contained in the stream are filtered, leaving only those elements that meet the conditions specified by Predicate.
Flux.range(1.10)
.filter(i -> i%2= =0)
.doOnNext(i -> {
System.out.println("[doOnNext] " + i);
})
.subscribe(i -> {
System.out.println("subscribe " + i);
});
// Execution result
[doOnNext] 2
subscribe 2
[doOnNext] 4
subscribe 4
[doOnNext] 6
subscribe 6
[doOnNext] 8
subscribe 8
[doOnNext] 10
subscribe 10
Copy the code
3. window
The Window operator acts like a buffer, except that the window operator collects elements from the current stream into another Flux sequence, so the return type is Flux.
Flux.range(1.15).window(5)
.doOnNext((flux -> {}))
.subscribe(flux -> {
flux.doOnNext((item) -> {
System.out.println("[window] flux: " + item);
})
.doOnComplete(() -> System.out.println("flux item complete"))
.subscribe();
});
// Execution result
[window] flux: 1
[window] flux: 2
[window] flux: 3
[window] flux: 4
[window] flux: 5
flux item complete
[window] flux: 6
[window] flux: 7
[window] flux: 8
[window] flux: 9
[window] flux: 10
flux item complete
[window] flux: 11
[window] flux: 12
[window] flux: 13
[window] flux: 14
[window] flux: 15
flux item complete
Copy the code
4. zipWith
The zipWith operator merges elements in the current stream with elements in another stream on a one-to-one basis. The result is a stream of element type Tuple2. The combined elements can also be processed through a BiFunction function, and the resulting stream has the element type returned by the function.
Flux.just("Hello"."Project")
.zipWith(Flux.just("World"."Reactor"))
.subscribe(System.out::println);
System.out.println("= = = = = =");
Flux.just("Hello"."Project")
.zipWith(Flux.just("World"."Reactor"), (s1, s2) -> String.format("%s! %s!", s1, s2))
.subscribe(System.out::println);
// Execution resultHello,World Project,Reactor ====== Hello! World! Project! Reactor!Copy the code
5. take
The take series of operators are used to extract elements from the current stream. There are many ways to extract it.
Take (long n), take(Duration timespan) and takeMillis(long Timespan) : Extract at specified quantities or intervals.
Flux.range(1.10).take(2).subscribe(System.out::println);
// Execution result
1
2
Copy the code
- TakeLast (Long N) : Extracts the last n elements in the stream.
Flux.range(1.10).takeLast(2).subscribe(System.out::println);
// Execution result
9
10
Copy the code
- takeUntil(Predicate
predicate) : Extracts elements until the predicate returns true.
Flux.range(1.10).takeUntil(i -> i == 6).subscribe(System.out::println);
// Execution result
1
2
3
4
5
6
Copy the code
- takeWhile(Predicate
continuePredicate) : The Predicate is extracted when it returns true.
Flux.range(1.10).takeWhile(i -> i < 5).subscribe(System.out::println);
// Execution result
1
2
3
4
Copy the code
- takeUntilOther(Publisher
other) : Extract elements until another stream starts producing them.
Flux.range(1.5).takeUntilOther((i) -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).subscribe(System.out::println);
// execute the result, pause 1000ms and start output
1
2
3
4
5
Copy the code
6. Reduce and reduceWith
The reduce and reduceWith operators accumulate all the elements contained in the flow to obtain a Mono sequence containing the calculated results. The cumulative operation is represented by a BiFunction. You can specify an initial value at operation time. If there is no initial value, the first element of the sequence is used as the initial value.
Flux.range(1.10)
.reduce((x, y) -> {
System.out.println("x:" + x + ", y:" + y);
return x+y;
})
.subscribe(System.out::println);
// Execution result
x:1, y:2
x:3, y:3
x:6, y:4
x:10, y:5
x:15, y:6
x:21, y:7
x:28, y:8
x:36, y:9
x:45, y:10
55
Copy the code
Flux.range(1.10)
.reduceWith(() -> 100, (x, y) -> {
System.out.println("x:" + x + ", y:" + y);
return x+y;
})
.subscribe(System.out::println);
// Execution result
x:100, y:1
x:101, y:2
x:103, y:3
x:106, y:4
x:110, y:5
x:115, y:6
x:121, y:7
x:128, y:8
x:136, y:9
x:145, y:10
155
Copy the code
7. The merge and mergeSequential
The merge and mergeSequential operators are used to merge multiple streams into a single Flux sequence. The difference is that mergeSequential mergeSequential merges on a stream basis in the order in which all streams are subscribed.
Flux.merge(Flux.interval(
Duration.of(0, ChronoUnit.MILLIS),
Duration.of(100, ChronoUnit.MILLIS)).take(2),
Flux.interval(
Duration.of(50, ChronoUnit.MILLIS),
Duration.of(100, ChronoUnit.MILLIS)).take(2))
.toStream()
.forEach(System.out::println);
System.out.println("= = = = = = = = = = = = = =");
Flux.mergeSequential(Flux.interval(
Duration.of(0, ChronoUnit.MILLIS),
Duration.of(100, ChronoUnit.MILLIS)).take(2),
Flux.interval(
Duration.of(50, ChronoUnit.MILLIS),
Duration.of(100, ChronoUnit.MILLIS)).take(2))
.toStream()
.forEach(System.out::println);
// Execution result
0
0
1
1= = = = = = = = = = = = = =0
1
0
1
Copy the code
8. FlatMap and flatMapSequential
The flatMap and flatMapSequential operators convert each element in a flow into a flow and combine the elements of all streams. The difference between flatMapSequential and flatMap is the same as the difference between mergeSequential and merge.
Flux.just(1, 2) .flatMap(x -> Flux.interval(Duration.of(x * 10, ChronoUnit.MILLIS), Duration.of(10, ChronoUnit.MILLIS)).take(x)) .toStream() .forEach(System.out::println); // Result 0 0 1Copy the code
9. ConcatMap and combineLatest
The concatMap operator also converts each element in a stream into a stream and then merges all streams. Unlike flatMap, concatMap merges transformed streams according to the order of elements in the original stream. Unlike flatMapSequential, which subscribes to all streams before merging, concatMap subscribes to transformed streams dynamically.
Flux.just(5.10)
.concatMap(x -> Flux.intervalMillis(x * 10.100).take(x))
.toStream()
.forEach(System.out::println);
Flux.combineLatest(
Arrays::toString,
Flux.intervalMillis(100).take(5),
Flux.intervalMillis(50.100).take(5)
).toStream().forEach(System.out::println);
Copy the code
End of the four.
I have briefly introduced Flux and Mono, two core concepts of Reactor, as well as the use of some common operators. It may be extremely difficult for some developers to use the reactive programming paradigm at first, but practice makes perfect, and the benefits of reactive programming can only be realized after long-term use.