1. Introduction to reactive programming

1.1. Congestion can be wasteful

Modern applications can reach large numbers of concurrent users, and despite the increasing capabilities of modern hardware, the performance of modern software remains a key issue.

In summary, there are two ways to improve the performance of a program:

  • Parallelization to use more threads and more hardware resources.
  • Seek greater efficiency in how current resources are used.

Typically, Java developers write programs using blocking code. This is fine, until there is a performance bottleneck. Then it’s time to introduce additional threads and run similar blocking code. But this expansion of resource utilization can quickly introduce contention and concurrency issues.

Worse, blocking wastes resources. If you look closely, as soon as the program involves some latency (especially I/O, such as database requests or network calls), resources are wasted because threads (possibly many threads) are now idle, waiting for data. So parallelization is not a panacea. It is necessary to access the full functionality of the hardware, but reasoning is also complex and easy to waste resources.

1.2. Using Asynchronicity

The second approach mentioned earlier, seeking greater efficiency, can solve the problem of wasting resources. By writing asynchronous, non-blocking code, you can have execution switch to another active task that uses the same underlying resource and return to the current process when the asynchronous processing is complete.

But how do you generate asynchronous code on the JVM? Java provides two asynchronous programming models:

  • Callbacks: Asynchronous methods return no value, but require an additional callback argument (lambda or anonymous function) that is called when the result is available. A famous example is Swing’s EventListener hierarchy.
  • Futures: The asynchronous method returns a Future immediately. The asynchronous process computes the T value and wraps access to it through a Future object. The value is not immediately available, and the object can be polled until the value is available. For example, ExecutorService uses Future objects to run **Callable** tasks.

However, both techniques have their limitations. Callbacks are hard to put together and can quickly lead to code that is difficult to read and maintain (a situation known as “Callback Hell”).

Future objects are better than callbacks, but futures are certainly harder to compose. Although Java 8 has been improved with CompletableFuture. It’s possible to orchestrate multiple Future objects, but it’s not easy. And Future has other problems:

  • Calling the get() method could easily cause the Future object to block in another way.

  • Lazy computing is not supported.

  • Lack of support for multivalues and advanced error handling.

1.3. From imperative programming to reactive programming

Reactative-functional programming solves the problem of concurrency and parallelism. More generally, it solves the callback hell problem. Callback hell deals with the problems posed by reactive and asynchronous use cases in an imperative manner. Reactive programming, such as the RxJava implementation, is influenced by functional programming and uses declarative approaches to avoid common problems with reactative-imperative code.

Responsive libraries such as Reactor, Rxjava aim to address these shortcomings of “classic” asynchronous methods on the JVM, while focusing on some additional aspects:

  • Composability and readability

  • Data operates as a flow using a rich set of operators

  • Nothing happens until you subscribe, delay the release.

  • Backpressure, or the ability of consumers to signal to producers that the rate of emissions is too high

  • A High level but High value abstraction that is concurrency independent

2. How does the Reactor Project run

At the core of a Reactor is the Flux /Mono type, which represents a stream of data or events. It is intended for push (reaction), but can also be used for pull (interactive). It is lazy, not eager. It can be used synchronously or asynchronously. It can represent zero, one, more, or an infinite number of values or events over time.

2.1. Flux principle

When the SUBSCRIBE method is executed, the publisher calls back the subscriber’s onSubscribe method, in which the subscriber typically requests n data from the publisher using the incoming Subscription. The publisher then issues up to N pieces of data to the subscriber by constantly calling the subscriber’s onNext method. If all data has been sent, onComplete is called to inform the subscriber that the stream has been sent. If an error occurs, error data is emitted via onError, which also terminates the stream.

  1. First, after a Publisher is created using a method like flux.just, a concrete Publisher, such as Flux Array, is created.

  2. When subscribing to the publisher with. Subscribe, a ****Subscription with the corresponding logic is first new (such as ArraySubscription, which defines how to process downstream requests and “emit data”); There is a type of Subscription for each Subscriber of the same type.

  3. The publisher then passes this Subscription to the subscriber through the.onsubscribe method of the subscriber;

  4. Subscription is required to initiate the first.request in the.onsubscribe method of a subscriber

  5. Subscription sends as many elements as required by calling back the subscriber’s onNext method;

  6. In onNext, the subscriber usually defines the processing logic for the element, and after the processing is complete, the request can continue;

  7. The Publisher continues to satisfy the Subscriber’s requests;

  8. Until the publisher’s sequence ends, notified via the subscriber’s onComplete; Of course, if there are any errors in the sequence sending process, the subscriber’s onError will be notified and the error information will be transmitted. In both cases, the sequence terminates and the subscription process ends.

2.2. Operator principle

Operator: only for data handling and processing, for the downstream as a Publisher, transfer upstream data to the downstream; For upstream, it acts as Subscriber to transfer the request from downstream to upstream.

3. Create a Flux

3.1. Create Flux in a simple way

3.1.1 the empty ()

Subscribe immediately and do not publish any values.

    @Test
    public void emptyTest(a) {
        PrintUtil.println("Before");
        Flux.empty()
                .subscribe(PrintUtil::println, PrintUtil::println, () -> PrintUtil.println("complete"));
        PrintUtil.println("After");
    }

2021- 08 -31 18:04:016 [Thread-Name-main], Before
2021- 08 -31 18:04:016 [Thread-Name-main], complete
2021- 08 -31 18:04:016 [Thread-Name-main], After
Copy the code

3.1.2 never ()

Do not publish any notification of either value, completion or failure. This flow is suitable for testing.

    @Test
    public void errorTest(a) {
        PrintUtil.println("Before");
        Flux.error(new RuntimeException("emitter an error"))
                .subscribe(PrintUtil::println, PrintUtil::println, () -> PrintUtil.println("complete"));
        PrintUtil.println("After");
    }

2021- 08 -31 1809:023 [Thread-Name-main], Before
2021- 08 -31 1809:024 [Thread-Name-main], After
Copy the code

3.1.3 the error ()

Send an onError() notification to each subscriber immediately. No values are published and, by contract, no onCompleted() notification is sent

    @Test
    public void neverTest(a) {
        PrintUtil.println("Before");
        Flux.never()
                .subscribe(PrintUtil::println, PrintUtil::println, () -> PrintUtil.println("complete"));
        PrintUtil.println("After");
    }

2021- 08 -31 18: 08:024 [Thread-Name-main], Before
2021- 08 -31 18: 08:024 [Thread-Name-main], java.lang.RuntimeException: emitter an error
2021- 08 -31 18: 08:024 [Thread-Name-main], After
Copy the code

3.1.4 range

Generate n integer numbers starting from start. For example, range(3, 3) will publish 3, 4, and 5 and then complete normally. Each subscriber receives the same set of numbers.

    @Test
    public void rangeTest(a) {
        PrintUtil.println("Before");
        Flux.range(3.3)
                .subscribe(PrintUtil::println);
        PrintUtil.println("After");
    }

2021- 08 -31 17:58:051 [Thread-Name-main], Before
2021- 08 -31 17:58:052 [Thread-Name-main], 3
2021- 08 -31 17:58:052 [Thread-Name-main], 4
2021- 08 -31 17:58:052 [Thread-Name-main], 5
2021- 08 -31 17:58:052 [Thread-Name-main], After
Copy the code

The order of print statements is also interesting. Not surprisingly, Before and After messages are printed out by the main client thread. However, note that the subscription also occurs in the client thread, and subscribe() actually blocks the client thread until all events are received. RxJava does not implicitly run code in a thread pool unless some operator requires it.

3.1.5 interval

Interval () generates a sequence of numbers of type long, starting at zero, with a fixed interval between each number. In some ways, interval() is similar to scheduleAtFixedRate() in ScheduledExecutorService. You can imagine many uses for interval(), such as polling data periodically, refreshing the user interface, or modeling the passage of time.

    @Test
    public void intervalTest(a) throws InterruptedException {
        Flux.interval(Duration.ofSeconds(1))
                .map(input -> {
                    if (input < 3) return "tick " + input;
                    throw new RuntimeException("boom");
                })
                .onErrorReturn("Uh oh")
                .subscribe(log::info);

        TimeUnit.SECONDS.sleep(5);
    }

18:17:18.548 [parallel-1] INFO wangxw.operator.OperatorTest - tick 0
18:17:19.549 [parallel-1] INFO wangxw.operator.OperatorTest - tick 1
18:17:20.548 [parallel-1] INFO wangxw.operator.OperatorTest - tick 2
18:17:21.564 [parallel-1] INFO wangxw.operator.OperatorTest - Uh oh
Copy the code

3.2. Create Flux programmatically

3.2.1 Synchronous :generate on-by-one

    public void testGenerate(a) {
        Flux<String> flux = Flux.generate(
                () -> 0.// Initial state value
                (state, sink) -> {
                    sink.next("3 x " + state + "=" + 3 * state); // Data is generated synchronously, one at a time
                    if (state == 10) {
                        sink.complete();
                    }
                    return state + 1; // Change the state
                },
                (state) -> System.out.println("state: " + state)); // Last state value
        // Requset ->sink.next generates data sequentially
        // Produce one data consume one
        flux.subscribe(System.out::println);
    }
Copy the code

3.2.2 Asynchronous and Multi-threaded: create

The flux.cretate () operator can be used to convert existing apis such as the bridge listener model into a reactive flow model that supports a push-pull pattern. An SpscQueue is maintained in the BufferAsyncSink created by the Cretate operator.

  • Push mode: When the listener is triggered, call sink.next(o), place the element in the SpscQueue, and immediately drain to the consumer.
  • ** Pull mode: the pull mode occurs when the consumer orders, and when the producer has data available, the pull mode will pull the data.

3.2.2.1. Schematic diagram

  1. First, after a publisher is created using the flux.create method, a concrete publisher (Flux create) is created.

  2. When subscribing to the publisher with.subscribe, we first create a new ****BufferAsyncSink with the corresponding logic.

  3. And then the publisher passes this BufferAsyncSink to the subscriber through the subscriber’s.onSUBSCRIBE method; Call back to Consumer in Flux Create and execute sink. OnRequest (requestConsumer) to assign requestConsumer to sink.

  4. In the subscriber’s.onSubscribe method, you need to initiate a BufferAsyncSink request, perform a requsetConsumer, and pull the data

  5. Asynchronously call sink.next(o) to push data to sqscQueue and poll data.

3.2.2.2. Code examples

    @Test
    public void testCreate(a) throws InterruptedException {
        MyEventProcessor<String> myEventProcesser = new MyEventProcessor<>();
        Flux.create(emitter -> {
            myEventProcesser.register(new MyEventListener<String>() {
                @Override
                public void onDataChunk(MyEvent<String> event) {
                    emitter.next(event);
                }

                @Override
                public void processComplete(a) { emitter.complete(); }}); emitter.onRequest(n -> {// n subscribe. Requset
                List<String> messages = getHistory(n);
                messages.forEach(PrintUtil::println);
            });
        }).subscribe(PrintUtil::println, PrintUtil::println); // No events have occurred yet;

        for (int i = 0; i < 20; i++) {  / / 6
            TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(1000));
            myEventProcesser.newEvent(new MyEvent<>(new Date(), "Event" + i));
        }
        myEventProcesser.processComplete();
    }
Copy the code

3.2.3. Asynchronous but single-threaded: push

push is a middle ground between generate and create which is suitable for processing events from a single producer. It is similar to create in the sense that it can also be asynchronous and can manage backpressure using any of the overflow strategies supported by create. However, only one producing thread may invoke next, complete or error at a time.

According to the official description, Push is between Create and generate. Asynchronously generate sequences and support backpressure. However, only one thread can call next, Compete, or Error at a time.

3.2.3.2. Schematic diagram

The only difference between push and create is that Flux create.createmode differs. Flux.push uses createmode. PUSH_ONLY, while flux.create uses Flux create.createmode. PUSH_PULL.

	public void subscribe(CoreSubscriber<? super T> actual) {
		BaseSink<T> sink = createSink(actual, backpressure);

		actual.onSubscribe(sink);
		try {
			source.accept(
					createMode == CreateMode.PUSH_PULL ? new SerializedFluxSink<>(sink) :
							sink);
		}
		catch(Throwable ex) { Exceptions.throwIfFatal(ex); sink.error(Operators.onOperatorError(ex, actual.currentContext())); }}Copy the code

BufferAsyncSink internally maintains a SpscLinkedArrayQueue that only supports single-threaded sink sources. Sink fires elements to the SpscLinkedArrayQueue and triggers drain, In fact, the poll element from SpscLinkedArrayQueue is passed to the consumer, and SpscLinkedArrayQueue is added as a relay.

In addition to maintaining a BufferAsyncSink as a delegate, SerializedSink maintains an MpscLinkedQueue (multi-producer single-consumer) queue, apparently supporting multi-threaded source production. Elements produced concurrently are pushed to the MpscLinkedQueue, ejected from the MpscLinkedQueue to the SpscLinkedArrayQueue, and finally passed to the consumer by a thread.

		public FluxSink<T> next(T t) {
			Objects.requireNonNull(t, "t is null in sink.next(t)");
			if (sink.isTerminated() || done) {
				Operators.onNextDropped(t, sink.currentContext());
				return this;
			}
            // WIP ensures that only one thread delegates next to sink(BufferAsyncSink) for processing
			if (WIP.get(this) = =0 && WIP.compareAndSet(this.0.1)) {
				try {
					sink.next(t);
				}
				catch (Throwable ex) {
					Operators.onOperatorError(sink, ex, t, sink.currentContext());
				}
				if (WIP.decrementAndGet(this) = =0) { // 
					return this; }}else {
                // When multiple threads produce elements concurrently, other concurrent threads directly emit elements to the mpscQueue
				this.mpscQueue.offer(t);
				if (WIP.getAndIncrement(this) != 0) {
					return this; }}// Add the SPSC to the MPSC queue
            // In the multi-source production mode, the elements are sent to the MPSC first, and the single consumer takes the elements out and puts them into the SPSC, with a transition added in between
			drainLoop();
			return this;
		}
Copy the code

So far there is no difference between the two. What are the benefits of doing this?

Cleaning up after Push () or create()

Two callback functions, onDispose and onCancel, perform any cleanup on cancellation or termination. OnDispose can be used to clean up when Flux is complete, an error occurs, or cancelled. OnCancel can be used to perform any cancelation-specific operation before cleaning with onDispose.

Flux<String> bridge = Flux.create(sink -> {
    sink.onRequest(n -> channel.poll(n))
        .onCancel(() -> channel.cancel()) 
        .onDispose(() -> channel.close())  
    });
Copy the code
  • OnCancel is called first and is only used to cancel the signal.

  • OnDispose is a complete, error, or cancel signal call.

4. From callback API to Flux flow

One of my favorite examples of streaming is Twitter status updates, otherwise known as tweets. Thousands of user updates occur every second, many accompanied by location, language, and other metadata. To complete this exercise, you’ll use the open source Twitter4J library, which uses a callback-based API to push a subset of new tweets. The simplest runnable example for reading tweets in real time is shown below.

import lombok.extern.slf4j.Slf4j;
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Flux;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;

import java.util.concurrent.TimeUnit;


/ * * *@Author: wangxw
 * @Date: 2021/08/31
 * @Description: * /
@Slf4j
public class Callback2FluxTest {

    public void add(a) throws InterruptedException {
        TwitterStream twitterStream = TwitterStreamFactory.getSingleton();
        twitterStream.addListener(new StatusListener() {
            @Override
            public void onStatus(Status status) {
                log.info("Status: {} ", status);
            }

            @Override
            public void onException(Exception e) {
                log.error("Error callback ", e);

            }
            // Other callbacks
        });

        twitterStream.sample();
        TimeUnit.SECONDS.sleep(10);
        twitterStream.shutdown();
    }   
Copy the code

Calling Twitterstream.sample () will start a background thread that logs in to Twitter and waits for new messages. The onStatus callback is executed every time a tweet appears. Execution may occur across threads, so instead of relying on the mechanism of exception throwing, use onException() notification. After sleeping for 10 seconds, shutdown the stream with shutdown() and clean up the underlying resources, such as HTTP connections or threads.

Overall, it doesn’t look that bad. The problem with this program is that it doesn’t do anything. In real life, you might process each Status message in some way, such as saving it to a database or feeding it to a machine learning algorithm. Technically, you could put this logic in callbacks, but that would couple the infrastructure calls with the business logic. It would be better to simply delegate functionality to a separate class, but unfortunately cannot be reused. What we really want is a clear separation between the technical domain (consuming data in HTTP connections) and the business domain (interpreting incoming data). So, we build a second layer of callbacks.

    void consume(Consumer<Status> onStatus, Consumer<Exception> onException) throws InterruptedException {
        TwitterStream twitterStream = TwitterStreamFactory.getSingleton();
        twitterStream.addListener(new StatusListener() {
            @Override
            public void onStatus(Status status) {
                onStatus.accept(status);
            }

            @Override
            public void onException(Exception e) {
                onException.accept(e);

            }
            // Other callbacks
        });

        twitterStream.sample();
        TimeUnit.SECONDS.sleep(10);
        twitterStream.shutdown();
    }
Copy the code

By adding this extra layer of abstraction, the Consume () method can now be reused in a variety of ways. Suppose instead of logging, you want to do persistence, analysis, or fraud detection.

But this only elevates the problem in the hierarchy. What if you want to track the number of tweets per second? Or just want to consume the first five pieces of data? What happens if you want to have multiple listeners? In each case, a new HTTP connection is opened. Finally, the API does not allow you to unsubscribe after completion, which risks resource leakage. We are working towards an RX-based API. At this point, instead of passing the callback to where it might be executed, a Flux is returned and everyone is allowed to subscribe to it as needed. However, one thing to keep in mind is that the following implementation still opens a new network connection for each Subscriber.

   public void flux(a) {
        Flux<Status> flux = Flux.<Status>create(emmiter -> {
            TwitterStream twitterStream = TwitterStreamFactory.getSingleton();
            twitterStream.addListener(new StatusListener() {
                @Override
                public void onStatus(Status status) {
                    emmiter.next(status);
                }

                @Override
                public void onException(Exception e) {
                    emmiter.error(e);

                }
                // Other callbacks
            });
            emmiter.onDispose(() -> twitterStream.shutdown()); / / close the twitterStream
        }).doOnSubscribe(s->log.info("doOnSubscribe"));

        flux.subscribe(status -> log.info("Status: {} ", status),
                ex -> log.error("Error callback ", ex));
    }
Copy the code

The above code is related to consume(…) The big difference is that you don’t have to pass the callback as an argument to observe(). Instead, a sample can return Flux, pass it around, store it somewhere, and use it whenever and wherever it is needed. This Flux can also be combined with other fluxes. One area that has not been discussed is resource cleansing, where TwitterStream should be shut down when someone unsubscribers to avoid resource leakage.

public class LazyTwitterFlux {

    private final Set<FluxSink<? super Status>> fluxSinks = new CopyOnWriteArraySet<>();

    private final Flux<Status> flux = Flux.create(emmiter -> {
        registrer(emmiter);
        emmiter.onDispose(() -> unregistrer(emmiter));
    });

    private final TwitterStream twitterStream;

    public LazyTwitterFlux(a) {
        this.twitterStream = TwitterStreamFactory.getSingleton();
        twitterStream.addListener(new StatusListener() {
            @Override
            public void onStatus(Status status) {
                fluxSinks.forEach(sink -> sink.next(status));
            }

            @Override
            public void onException(Exception e) {
                fluxSinks.forEach(fluxSink -> fluxSink.error(e));

            }
            // Other callbacks
        });
    }

    public Flux<Status> flux(a) {
        return flux;
    }

    private synchronized void registrer(FluxSink<? super Status> fluxSink) {
        fluxSinks.add(fluxSink);
        if(fluxSinks.isEmpty()) { twitterStream.sample(); }}private synchronized void unregistrer(FluxSink<? super Status> fluxSink) {
        fluxSinks.remove(fluxSink);
        if(fluxSinks.isEmpty()) { twitterStream.shutdown(); }}}@Test
    public void lazy(a) {
        LazyTwitterFlux lazyTwitterFlux = new LazyTwitterFlux();

        Flux<Status> flux1 = lazyTwitterFlux.flux()
                .doOnSubscribe(s -> log.info("doOnSubscribe"));
        Flux<Status> flux2 = lazyTwitterFlux.flux()
                .doOnSubscribe(s -> log.info("doOnSubscribe"));

        flux1.subscribe(status -> log.info("Status: {} ", status),
                ex -> log.error("Error callback ", ex));
        flux2.subscribe(status -> log.info("Status: {} ", status),
                ex -> log.error("Error callback ", ex));
    }
Copy the code

The set thread of fluxSinks securely stores the current Subscriber set which has been subscribed. Each time a new Subscriber appears, it is added to a set and connected to the underlying event source. Instead, when the last Subscriber disappears, the upstream source is closed. The key here is that there is always only one connection to the upstream system, rather than establishing a connection for each subscriber. This implementation works and is robust, but seems too low-level and error-prone. Subscribers must be accessed using synchronized, and the collection itself must support secure iteration. The call to register() must occur before the logout callback via reregister(), otherwise the latter may be invoked before registration. For a common scenario of multiplexing an upstream source into multiple Flux, there must be a better way to do it. Fortunately, there are at least two such mechanisms. Rx aims to reduce such dangerous boilerplate code and abstract away concurrency. Section 6.4 implements a single subscription using refCount() for ConnectableFlux.

5. Hot and Cold Flux

Once you have a Flux instance, it is important to understand whether it is of type hot or cold. The APIS and semantics are the same, but the way Flux is used depends on its type.

5.1. Cold type

Cold Flux executes entirely lazily and starts publishing events only when someone is interested in it. If there are no observers, then Flux is just a static data structure. This also means that each subscriber receives a copy of his or her own stream, since events are generated lazily and generally do not take any form of caching. Cold Flux, usually derived from flux.create (), semantically does not enable any logic, but postpones execution until someone is actually listening on it. To some extent, cold Flux depends on Subscriber. Examples of cold type Flux include flux.just (), from(), and range() in addition to create(). Subscribing to a Cold Flux usually also involves side effects in CREATE (), such as querying a database or opening a connection.

5.2. Hot type

Hot Flux is different. By the time this type of Flux is obtained, it may have already started issuing events, regardless of whether there is Subscriber. Even if no one is listening, events may be lost and Flux pushes events downstream. Typically, cold type Flux can be fully controlled, but hot type Flux is independent of the consumer. When Subscriber is present, hot Flux behaves like wire tap, transparently publishing events that flow through it. The appearance and disappearance of Subscriber does not change the behavior of Flux, which is completely decoupled and independent.

Hot Flux usually occurs when the event source is completely out of control. Examples of this Flux include mouse movement, keyboard input, or button clicking.

When relying on event passing, the difference between Hot and Cold Flux becomes very important. Whether you subscribe now or a few hours later to cold type Flux, you get a complete and consistent set of events. But if Flux is of the hot type, then you cannot be sure that all events will be received. Some techniques to ensure that each subscriber receives all events are described later, such as the cache() operator, which technically buffers all events from hot Flux so that subsequent subscribers receive the same sequence of events. In theory, however, there is no limit to the amount of memory it consumes, so be very careful when caching hot type Flux.

6. ConnectableFlux

Sometimes, we might not want to defer some processing to just one subscriber’s subscription time, but rather want several of them to meet and then trigger the subscription and data generation (somewhat similar to CountDownLantch).

ConnectableFlux was born for this purpose. There are two common ways to return ConnectableFlux in the Flux API: publish and replay.

  1. publishAttempts are made to meet the needs of various subscribers (i.e., backpressure) and to synthesize these requests back to the source. Assuming a subscriber has a requirement of 0, the publisher pauses issuing elements to all subscribers.
  2. replayThe data generated after the first subscription will be cached, depending on the configuration (time/cache size). It resends the data to subsequent subscribers.

ConnectableFlux provides a variety of ways to manage subscriptions. Include:

  • Connect () You can manually execute Flux once there are enough subscriptions to access. It triggers subscriptions to upstream sources.

  • AutoConnect (n) is similar to connect(), but fires automatically when there are n subscriptions.

  • RefCount (n) not only automatically triggers when a subscriber is connected, but also detects subscriber cancellations. If all subscribers unsubscribe, the source will be “disconnected” and the publisher will not be “connected” until new subscribers are connected.

  • RefCount (int, Duration) adds a countdown: once the number of subscribers is too low, it waits for the time specified by the Duration parameter before disconnecting from the source if no new subscribers are connected.

6.1. The connect example

    @Test
    public void connectTest(a) throws InterruptedException {
        Flux<String> source = Flux.range(1.3)
                .map(Object::toString)
                .doOnSubscribe(s -> log.info("subscribed to source"));

        ConnectableFlux<String> co = source.publish();

        co.subscribe(log::info);
        co.subscribe(log::info);

        log.info("done subscribing");
        TimeUnit.SECONDS.sleep(1);

        log.info("will now connect");
        co.connect();
    }

16:06:03.494 [main] INFO wangxw.flux.FluxTest - done subscribing
16:06:04.496 [main] INFO wangxw.flux.FluxTest - will now connect
16:06:04.498 [main] INFO wangxw.flux.FluxTest - subscribed to source
16:06:04.500 [main] INFO wangxw.flux.FluxTest - 1
16:06:04.500 [main] INFO wangxw.flux.FluxTest - 1
16:06:04.500 [main] INFO wangxw.flux.FluxTest - 2
16:06:04.500 [main] INFO wangxw.flux.FluxTest - 2
16:06:04.500 [main] INFO wangxw.flux.FluxTest - 3
16:06:04.500 [main] INFO wangxw.flux.FluxTest - 3
Copy the code

Upstream emits data only after connect().

6.2. AutoConnect (n) example

    @Test
    public void autoConnect(a) throws InterruptedException {
        Flux<String> source = Flux.range(1.3)
                .map(Object::toString)
                .doOnSubscribe(s -> log.info("subscribed to source"));

        Flux<String> co = source.publish().autoConnect(2);

        log.info("subscribed first");
        co.subscribe(log::info);

        TimeUnit.SECONDS.sleep(1);

        log.info("subscribing second");
        co.subscribe(log::info);
    }

17:18:09.468 [main] INFO wangxw.flux.FluxTest - subscribed first
17:18:10.475 [main] INFO wangxw.flux.FluxTest - subscribing second
17:18:10.486 [main] INFO wangxw.flux.FluxTest - subscribed to source
17:18:10.492 [main] INFO wangxw.flux.FluxTest - 1
17:18:10.492 [main] INFO wangxw.flux.FluxTest - 1
17:18:10.492 [main] INFO wangxw.flux.FluxTest - 2
17:18:10.492 [main] INFO wangxw.flux.FluxTest - 2
17:18:10.493 [main] INFO wangxw.flux.FluxTest - 3
17:18:10.493 [main] INFO wangxw.flux.FluxTest - 3
Copy the code

When both subscribers have completed the subscription, upstream receives the subscription request and starts sending data.

6.3. RefCount () example

    @Test
    public void refCountTest(a) throws InterruptedException {
        Flux<String> source = Flux.interval(Duration.ofMillis(500))
                .map(Object::toString)
                .doOnSubscribe(s -> log.info("doOnSubscribe"))
                .doOnCancel(() -> log.info("doOnCancel"));

        Flux<String> flux = source.publish().refCount(2, Duration.ofSeconds(2));

        log.info("subscribed first");
        Disposable s1 = flux.subscribe(x -> log.info("s1:" + x));

        TimeUnit.SECONDS.sleep(1);
        log.info("subscribed second");
        Disposable s2 = flux.subscribe(x -> log.info("s2:" + x));

        TimeUnit.SECONDS.sleep(1);
        log.info("subscribed first disposable");
        s1.dispose();

        TimeUnit.SECONDS.sleep(1);
        log.info("subscribed second disposable"); // All subscribers have cancelled
        s2.dispose();

        TimeUnit.SECONDS.sleep(1); // Within 2s s3 subscribed
        log.info("subscribed third");
        Disposable s3 = flux.subscribe(x -> log.info("s3:" + x));

        TimeUnit.SECONDS.sleep(1);
        log.info("subscribed third disposable");
        s3.dispose(); // All subscribers have cancelled Disconnect

        TimeUnit.SECONDS.sleep(3); 
        log.info("subscribed fourth"); // After 3s (more than 2s) s4, S5 subscription, trigger connect
        Disposable sub4 = flux.subscribe(l -> log.info("s4: " + l));
        TimeUnit.SECONDS.sleep(1);
        log.info("subscribed fifth");
        Disposable sub5 = flux.subscribe(l -> log.info("s5: " + l));
        TimeUnit.SECONDS.sleep(2);
    }

17:29:23.044 [main] INFO wangxw.flux.ConnectableFluxTest - subscribed first
17:29:24.052 [main] INFO wangxw.flux.ConnectableFluxTest - subscribed second
17:29:24.067 [main] INFO wangxw.flux.ConnectableFluxTest - doOnSubscribe
17:29:24.576 [parallel-1] INFO wangxw.flux.ConnectableFluxTest - s1:0
17:29:24.576 [parallel-1] INFO wangxw.flux.ConnectableFluxTest - s2:0
17:29:25.076 [parallel-1] INFO wangxw.flux.ConnectableFluxTest - s1:1
17:29:25.076 [parallel-1] INFO wangxw.flux.ConnectableFluxTest - s2:1
17:29:25.076 [main] INFO wangxw.flux.ConnectableFluxTest - subscribed first disposable
17:29:25.576 [parallel-1] INFO wangxw.flux.ConnectableFluxTest - s2:2
17:29:26.094 [parallel-1] INFO wangxw.flux.ConnectableFluxTest - s2:3
17:29:26.094 [main] INFO wangxw.flux.ConnectableFluxTest - subscribed second disposable
17:29:27.101 [main] INFO wangxw.flux.ConnectableFluxTest - subscribed third
17:29:27.101 [main] INFO wangxw.flux.ConnectableFluxTest - s3:4
17:29:27.101 [main] INFO wangxw.flux.ConnectableFluxTest - s3:5
17:29:27.576 [parallel-1] INFO wangxw.flux.ConnectableFluxTest - s3:6
17:29:28.075 [parallel-1] INFO wangxw.flux.ConnectableFluxTest - s3:7
17:29:28.102 [main] INFO wangxw.flux.ConnectableFluxTest - subscribed third disposable
17:29:30.103 [parallel-3] INFO wangxw.flux.ConnectableFluxTest - doOnCancel // Notice that doOnCancel is performed after 2s
17:29:31.103 [main] INFO wangxw.flux.ConnectableFluxTest - subscribed fourth
17:29:32.104 [main] INFO wangxw.flux.ConnectableFluxTest - subscribed  fifth
17:29:32.104 [main] INFO wangxw.flux.ConnectableFluxTest - doOnSubscribe
17:29:32.606 [parallel-4] INFO wangxw.flux.ConnectableFluxTest - s4: 0
17:29:32.606 [parallel-4] INFO wangxw.flux.ConnectableFluxTest - s5: 0
17:29:33.107 [parallel-4] INFO wangxw.flux.ConnectableFluxTest - s4: 1
17:29:33.107 [parallel-4] INFO wangxw.flux.ConnectableFluxTest - s5: 1
17:29:33.605 [parallel-4] INFO wangxw.flux.ConnectableFluxTest - s4: 2
17:29:33.605 [parallel-4] INFO wangxw.flux.ConnectableFluxTest - s5: 2
17:29:34.105 [parallel-4] INFO wangxw.flux.ConnectableFluxTest - s4: 3
17:29:34.105 [parallel-4] INFO wangxw.flux.ConnectableFluxTest - s5: 3
Copy the code

In this example, refCount() is set to a minimum of two subscribers before sending data, and when all subscribers cancel, if no new subscribers can be added within two seconds, upstream will disconnect.

In the example above, as the first two subscribers unsubscribe one after another, the third subscriber starts to subscribe in time (within 2 seconds), so upstream continues to send data, which is “hot flux” according to the output.

When the third subscriber cancelled, the fourth subscriber failed to start the subscription in time, so the upstream publisher disconnected. When the fifth subscriber subscribes, the fourth and fifth subscribers start a new round of subscriptions.

6.4. Single subscription using refCount()

ConnectableFlux coordinates multiple subscribers and shares an underlying subscription in an interesting way. Remember the initial use of LazyTwitterFlux to create a single delayed-execution connection to the underlying resource? All Subscriber must be manually tracked and if the first Subscriber appears or the last Subscriber leaves, the connection is established or disconnected. ConnectableFlux is a special type of Flux that ensures that there is always at most one Subscriber at the bottom, but actually allows multiple subscribers to share the same underlying resource.

Subject is the necessary way to create a Flux, while ConnectableFlux protects the original upstream Flux and ensures that at most one Subscriber can access it. No matter how many Subscriber connections there are to ConnectableFlux, the system will only open a subscription to one Flux based on which the subscription is created.

    @Test
    public void refCounted(a) {
        Flux<Status> flux = Flux.<Status>create(emmiter -> {
            log.info("Establishing connection");
            twitterStream.addListener(new StatusListener() {
                @Override
                public void onStatus(Status status) {
                    emmiter.next(status);
                }

                @Override
                public void onException(Exception e) {
                    emmiter.error(e);

                }
                
                // Other callbacks
            });
            emmiter.onDispose(() -> {
                log.info("Disconnecting");
                twitterStream.shutdown();
            });
        }).doOnSubscribe(s -> log.info("doOnSubscribe"))
                .doOnComplete(() -> log.info("doOnComplete"));

        Flux<Status> refCounted = flux.publish().refCount();


        Disposable s1 = refCounted.subscribe(status -> log.info("Status: {} ", status),
                ex -> log.error("Error callback ", ex));
        Disposable s2 = refCounted.subscribe(status -> log.info("Status: {} ", status),
                ex -> log.error("Error callback ", ex));

        s1.dispose();
        s2.dispose();
    }

17:51:21.610 [main] INFO wangxw.flux.Callback2FluxTest - doOnSubscribe
17:51:21.614 [main] INFO wangxw.flux.Callback2FluxTest - Establishing connection
17:51:21.616 [main] INFO wangxw.flux.Callback2FluxTest - Disconnecting
Copy the code

The connection will not be established until the first Subscriber is actually available. But, more importantly, the second Subscriber does not initiate a new connection, and it does not even touch the original Flux. Publish ().refcount () wraps the underlying Flux cascade and intercepts all subscriptions.

The operator

defer

Create a delay

    @Test
    public void deferTest(a) throws InterruptedException {
        Flux<String> flux1 = Flux.just(PrintUtil.println(new Date()));

        Flux<String> flux2 = Flux.defer(() -> Flux.just(PrintUtil.println(new Date())));

        flux1.subscribe(x -> log.info("s1: " + x));
        flux2.subscribe(x -> log.info("s2: " + x));

        TimeUnit.SECONDS.sleep(3);

        flux1.subscribe(x -> log.info("s3: " + x));
        flux2.subscribe(x -> log.info("s4: " + x));
    }

15:25:24.629 [main] INFO wangxw.operator.OperatorTest - s1: 2021- 09 -06 15:25:024
15:25:24.630 [main] INFO wangxw.operator.OperatorTest - s2: 2021- 09 -06 15:25:024
15:25:27.633 [main] INFO wangxw.operator.OperatorTest - s3: 2021- 09 -06 15:25:024
15:25:27.634 [main] INFO wangxw.operator.OperatorTest - s4: 2021- 09 -06 15:25:027
Copy the code

delayElements

    @Test
    public void delayTest(a) throws InterruptedException {
        Flux.just("1"."2").delayElements(Duration.ofSeconds(2))
                .subscribe(log::info);

        TimeUnit.SECONDS.sleep(5);
    }

16:02:40.482 [parallel-1] INFO wangxw.operator.OperatorTest - 1
16:02:42.483 [parallel-2] INFO wangxw.operator.OperatorTest - 2
Copy the code

Delayed publishing. When running this program, the application terminates immediately without showing any results, even if the subscription is made. This is because the publishing of events is run asynchronously in the background, so any sleep() needs to be added at the end.

map

    @Test
    public void mapTest(a) throws InterruptedException {
        Flux.just(1.2.3.4)
                .map(i -> {
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return i * 2 + "";
                })
                .log()
                .subscribe(log::info);
    }

16:07:24.005 [main] INFO reactor.Flux.MapFuseable1. - | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
16:07:24.007 [main] INFO reactor.Flux.MapFuseable1. - | request(unbounded)
16:07:25.007 [main] INFO reactor.Flux.MapFuseable1. - | onNext(2)
16:07:25.008 [main] INFO wangxw.operator.OperatorTest - 2
16:07:26.009 [main] INFO reactor.Flux.MapFuseable1. - | onNext(4)
16:07:26.009 [main] INFO wangxw.operator.OperatorTest - 4
16:07:27.009 [main] INFO reactor.Flux.MapFuseable1. - | onNext(6)
16:07:27.009 [main] INFO wangxw.operator.OperatorTest - 6
16:07:28.009 [main] INFO reactor.Flux.MapFuseable1. - | onNext(8)
16:07:28.009 [main] INFO wangxw.operator.OperatorTest - 8
16:07:28.012 [main] INFO reactor.Flux.MapFuseable1. - | onComplete()
Copy the code

The map is executed synchronously and blocks the client thread.

flatMap

FlatMap () is one of the most important operators in Rx. At first glance, it looks similar to map(), but its transformation of each element returns another (embedded) Flux. Since Flux could represent another asynchronous operation, we quickly realized that flatMap() could perform an asynchronous calculation (fork execution) for each event upstream and add the results.

    @Test
    public void flatMapTest(a) throws InterruptedException {
        Function<Integer, Publisher<String>> mapper = i -> Flux.just(i * 2 + "").delayElements(Duration.ofSeconds(1));

        Flux.just(1.2.3.4)
                .flatMap(mapper)
                .subscribe(log::info);
        
        TimeUnit.SECONDS.sleep(10);
    }

17:23:42.566 [parallel-2] INFO wangxw.operator.OperatorTest - 4
17:23:42.566 [parallel-1] INFO wangxw.operator.OperatorTest - 2
17:23:42.568 [parallel-3] INFO wangxw.operator.OperatorTest - 6
17:23:42.572 [parallel-4] INFO wangxw.operator.OperatorTest - 8
Copy the code

Essentially, flatMap() takes a master sequence (Flux) of values that occur over time (events) and replaces each event with a separate subsequence. These subsequences are unrelated to each other and to the events in the main sequence that generated them. Rather, instead of having a single main sequence, you have a set of fluxes, each of which operates independently and appears and disappears over time. Therefore, flatMap() does not give any guarantee about the order in which subevents arrive at downstream operators/subscribers.

   @Test
    public void flatMap3Test(a) throws InterruptedException {
        Flux.just(DayOfWeek.SUNDAY, DayOfWeek.MONDAY)
                .concatMap(this::loadRecordFor)
                .subscribe(log::info);

        TimeUnit.SECONDS.sleep(5);
    }

    private Flux<String> loadRecordFor(DayOfWeek dow) {
        switch (dow) {
            case SUNDAY:
                return Flux.interval(Duration.ofMillis(90))
                        .take(5)
                        .map(i -> "Sun" + i);
            case MONDAY:
                return Flux.interval(Duration.ofMillis(65))
                        .take(5)
                        .map(i -> "Mon" + i);
            default:
                returnFlux.empty(); }}17:31:38.893 [parallel-2] INFO wangxw.operator.OperatorTest - Mon0
17:31:38.918 [parallel-1] INFO wangxw.operator.OperatorTest - Sun0
17:31:38.957 [parallel-2] INFO wangxw.operator.OperatorTest - Mon1
17:31:39.007 [parallel-1] INFO wangxw.operator.OperatorTest - Sun1
17:31:39.023 [parallel-2] INFO wangxw.operator.OperatorTest - Mon2
17:31:39.087 [parallel-2] INFO wangxw.operator.OperatorTest - Mon3
17:31:39.096 [parallel-1] INFO wangxw.operator.OperatorTest - Sun2
17:31:39.152 [parallel-2] INFO wangxw.operator.OperatorTest - Mon4
17:31:39.188 [parallel-1] INFO wangxw.operator.OperatorTest - Sun3
17:31:39.277 [parallel-1] INFO wangxw.operator.OperatorTest - Sun4
Copy the code

Controls flatMap concurrency

Suppose you have a list of a large number of users wrapped in Flux. Each User has a loadProfile() method that returns a Flux instance via an HTTP request. Our goal is to capture all user profiles as quickly as possible, and flatMap() is designed to achieve this goal, allowing concurrent computations of upstream values, as shown below

    @Test
    public void flatMap4Test(a) {
        List<User> users = new ArrayList<>();
        Flux.fromIterable(users)
                .flatMap(User::loadProfile);

    }

    static class User {

        public Flux<Profile> loadProfile(a) {
            // Send an HTTP request
            returnFlux.empty(); }}static class Profile {}Copy the code

At first glance, this seems like a good idea. Flux is generated from a fixed List using the from() operator. Therefore, when subscribing to it, all users are immediately released. For each new User, flatMap() calls loadProfile() and returns Flux. FlatMap () then transparently subscribes to each new Flux, forwarding all Profile events downstream. Subscribing to internal Flux is equivalent to initiating a new HTTP connection. So let’s say we have 10,000 users, and all of a sudden we have 10,000 concurrent HTTP requests. If all of these requests were to visit the same server, you would expect nothing more than the following:

  • Deny the connection.

  • Long waits and timeouts.

  • The server is down.

  • Encounter speed limit or be added to blacklist.

  • Overall latency increases.

  • Client problems include too many open sockets, threads, and excessive memory consumption.

Increasing concurrency pays off to some extent, but if you try to run too many concurrent operations, you end up with a lot of context switches, high memory and CPU usage, and overall performance degradation.

Flux.fromIterable(users)
               .flatMap(User::loadProfile, 10);
Copy the code

FlatMap () has a very simple form of overloading that limits the total number of concurrent subscriptions for an internal stream. The concurrency parameter limits the number of internal Flux subscriptions. In practice, flatMap() calls loadProfile() for each of the first 10 users it receives, but flatMap() does not call loadProfile() for an 11th User from upstream. Instead, it waits for the running internal flow to complete. Therefore, the concurrency parameter limits the number of background tasks that flatMap() can generate.

ConcatMap (f) is the semantically equivalent of flatMap(f, 1) (aka flatMap() with a concurrency value of 1).

The flatMap is used to compute the Cartesian product

Generates the Cartesian product from all the values in both streams. For example, there might be two Flux, one representing the rows of the board (rank, from 1 to 8) and the other representing the columns of the board (file, from A to H). You should be able to find all 64 possible squares on the board. Flux will publish 64 events exactly: for A it will generate A1, A2… A8, then B1, b2, and so on, until you get to H7 and H8. This is another very interesting example of flatMap(), where each column (file) generates all possible squares for that column.

    @Test
    public void cartesianTest(a) {
        Flux<Integer> oneToEight = Flux.range(1.8);
        Flux<String> ranks = oneToEight.map(Objects::toString);
        Flux<String> files = oneToEight.map(x -> 'a' + x - 1)
                .map(ascii -> (char) ascii.intValue())
                .map(ch -> Character.toString(ch));

        Flux<String> squares = files
                .flatMap(file -> ranks.map(rank -> file + rank));

        squares.subscribe(log::info);
    }
Copy the code

concatMap

ConcatMap () preserves the order of downstream events so that it perfectly matches the order of upstream events.

   @Test
    public void flatMap3Test(a) throws InterruptedException {
        Flux.just(DayOfWeek.SUNDAY, DayOfWeek.MONDAY)
                .concatMap(this::loadRecordFor)
                .subscribe(log::info);

        TimeUnit.SECONDS.sleep(5);
    }

    private Flux<String> loadRecordFor(DayOfWeek dow) {
        switch (dow) {
            case SUNDAY:
                return Flux.interval(Duration.ofMillis(90))
                        .take(5)
                        .map(i -> "Sun" + i);
            case MONDAY:
                return Flux.interval(Duration.ofMillis(65))
                        .take(5)
                        .map(i -> "Mon" + i);
            default:
                returnFlux.empty(); }}17:27:15.161 [parallel-1] INFO wangxw.operator.OperatorTest - Sun0
17:27:15.250 [parallel-1] INFO wangxw.operator.OperatorTest - Sun1
17:27:15.340 [parallel-1] INFO wangxw.operator.OperatorTest - Sun2
17:27:15.432 [parallel-1] INFO wangxw.operator.OperatorTest - Sun3
17:27:15.520 [parallel-1] INFO wangxw.operator.OperatorTest - Sun4
17:27:15.587 [parallel-2] INFO wangxw.operator.OperatorTest - Mon0
17:27:15.652 [parallel-2] INFO wangxw.operator.OperatorTest - Mon1
17:27:15.716 [parallel-2] INFO wangxw.operator.OperatorTest - Mon2
17:27:15.783 [parallel-2] INFO wangxw.operator.OperatorTest - Mon3
17:27:15.848 [parallel-2] INFO wangxw.operator.OperatorTest - Mon4
Copy the code

When the first event (Sunday) appears from upstream, concatMap() subscribes to the Flux generated by loadRecordsFor() and passes all events generated downstream. When the internal flow completes, concatMap() waits for the next upstream event (Monday) and repeats the process. ConcatMap () does not involve any concurrency, but it ensures the order of upstream events, avoiding overlap.

FlatMap () internally uses the merge() operator, which simultaneously subscribes to all subflux without any distinction between them. This is also why downstream events cross each other. However, concatMap() can technically use the concat() operator. Concat () only subscribes to the first underlying Flux first and only subscribes to the second once the first is complete.

merge

Merge streams sequentially

@Test public void mergeTest() throws InterruptedException { Flux<String> flux1 = Flux.interval(Duration.ofMillis(300)).map(x -> "p1: " + x); Flux<String> flux2 = Flux.interval(Duration.ofMillis(500)).map(x -> "p2: " + x); Flux<String> mergeFlux = Flux.merge(flux1, flux2); mergeFlux.subscribe(log::info); TimeUnit.SECONDS.sleep(2); INFO} 19:03:12. 339 [the parallel - 1] wangxw. Operator. OperatorTest - p1: [the parallel - 2 0 19:03:12. 541] INFO wangxw. Operator. OperatorTest - p2: 0 19:03:12. 641] [the parallel - 1 the INFO wangxw. Operator. OperatorTest - p1: 1 19:03:12. 939] [the parallel - 1 the INFO wangxw. Operator. OperatorTest - p1: 2 19:03:13. [the parallel - 040 2] INFO wangxw. Operator. OperatorTest - p2: 1 19:03:13. 240] [the parallel - 1 the INFO wangxw. Operator. OperatorTest - p1: 3 19:03:13. 540] [the parallel - 1 the INFO wangxw. Operator. OperatorTest - p1: 4 19:03:13. [the parallel - 542 2] INFO wangxw. Operator. OperatorTest - p2: 2 19:03:13. 841] [the parallel - 1 the INFO wangxw. Operator. OperatorTest - p1: 5 19:03:14. [the parallel - 040 2] INFO wangxw. Operator. OperatorTest - p2:3Copy the code

Note that any errors that occur in the underlying Flux are immediately passed on to the Observer. You can use the mergeDelayError() variant of merge() to delay errors so that error notifications are not published until all other streams have completed. MergeDelayError () can ensure even collect all exceptions, not just the first, and put them wrapped in rx.exceptions.Com positeException.

zip

Zipping refers to the operation of combining two (or more) streams, in which each event in one stream must be paired with its counterpart in the other streams. Downstream events are generated by combining the first event in each flow, then the second event, and so on.

concat

Subscribe sequentially, wait for the first stream to complete, and then subscribe to the next one in turn.

@Test public void concatTest () throws InterruptedException { Flux<String> flux1 = Flux.just("1","2","3").delayElements(Duration.ofSeconds(1)); Flux<String> flux2 = Flux.just("4","5","6"); Flux<String> flux = Flux.concat(flux1, flux2); flux.subscribe(log::info); TimeUnit.SECONDS.sleep(3); INFO} 16:43:21. 315 [the parallel - 1] wangxw. Operator. 16:43:22 OperatorTest - 1. The 317 [the parallel - 2] INFO Wangxw. Operator. OperatorTest - 2 16:43:23. 318 [the parallel - 3] INFO wangxw. Operator. OperatorTest - 3 16:43:23. 318 [the parallel - 3] INFO wangxw. Operator. OperatorTest - 4 16:43:23. [the parallel - 3] 318 INFO wangxw. Operator. OperatorTest - 5 16:43:23. 319 [the parallel - 3] INFO wangxw. Operator. OperatorTest - 6Copy the code

Error handling operator

The most common ways to handle an exception in a try-catch block are:

  1. Capture and return a static default value.
  2. Capture and dynamically calculate the fallback value.
  3. Catch and execute a fallback method.
  4. Catch, wrap it into a BusinessException, and then rethrow it.
  5. Catch, log a specific error, and then rethrow.
  6. Use finally blocks to clean up resources or Java 7 “try-with-resource” constructs.

Static Fallback Value

The equivalent of “capture and return static defaults” in Reactor is onErrorReturn. The following example shows how to use it:

try {
  return doSomethingDangerous(10);
}
catch (Throwable error) {
  return "RECOVERED";
}

public String doSomethingDangerous(int i) {
    if (i == 10) {
        throw new BusinessException();
     }
     return i + "";
}
Copy the code

The equivalent in Reactor is:

Flux<String> flux = Flux.just(10)
      	.map(this::doSomethingDangerous)
      	.onErrorReturn("RECOVERED");
flux.subscribe(log::info, e -> log.error("error", e));

00:07:57.556 [main] INFO wangxw.operator.ErrorHandleOperatorTest - RECOVERED
Copy the code

You can also perform a Predicate to determine whether the exception will be recovered by performing the select execution, as shown in the following example:

Flux.just(10)
    .map(this::doSomethingDangerous)
    .onErrorReturn(e -> e.getMessage().equals("boom10"), "recovered10"); 
Copy the code

The rollback is performed only when the exception message is boom10.

Fallback Method

The equivalent of “capture and execute a fallback method” in Reactor is onErrorResume, for example, from an external but unreliable service, and from the cache as a fallback value when the external service fails, as shown in the following example:

String v1;
try {
  v1 = callExternalService("key1");
}
catch (Throwable error) {
  v1 = getFromCache("key1");
}

String v2;
try {
  v2 = callExternalService("key2");
}
catch (Throwable error) {
  v2 = getFromCache("key2");
}
Copy the code

The equivalent in Reactor is:

Flux.just("key1"."key2")
    .flatMap(k -> callExternalService(k) // Each key invokes an external service
        .onErrorResume(e -> getFromCache(k)) // The external service fails to be executed
    );
Copy the code

Like onErrorReturn, onErrorResume has some variants that let you sort the returned exceptions based on the type of exception or Predicate. It accepts the fact of a Function and allows you to switch to a different fallback method depending on the error encountered. The following example shows how to do this:

Flux.just("timeout1"."unknown"."key2")
    .flatMap(k -> callExternalService(k)
        .onErrorResume(error -> { // If an error occurs, how does dynamic selection continue
            if (error instanceof TimeoutException) 
                return getFromCache(k); // If timeout occurs, the local cache is accessed from
            else if (error instanceof UnknownKeyException)  
                return registerNewEntry(k, "DEFAULT"); // Create a new entry if the key does not exist
            else
                return Flux.error(error); // Re-toss in other cases}));Copy the code

Dynamic Fallback Value

An example of an imperative is as follows:

try {
  Value v = erroringMethod();
  return MyWrapper.fromValue(v);
}
catch (Throwable error) {
  return MyWrapper.fromError(error);
}
Copy the code

The Reactor code is as follows:

erroringFlux.onErrorResume(error -> Mono.just( 
        MyWrapper.fromError(error) 
));
Copy the code

Catch and Rethrow

An example of an imperative is as follows:

try {
  return callExternalService(k);
}
catch (Throwable error) {
  throw new BusinessException("oops, SLA exceeded", error);
}
Copy the code

In the “fallback Method” example, the last line in the flatMap tells us how to achieve the same goal in a reactive manner, as follows:

Flux.just("timeout1")
    .flatMap(k -> callExternalService(k))
    .onErrorResume(original -> Flux.error(
            new BusinessException("oops, SLA exceeded", original))
    );
Copy the code

However, there is a more straightforward way to achieve the same effect with onErrorMap:

Flux.just("timeout1")
    .flatMap(k -> callExternalService(k))
    .onErrorMap(original -> new BusinessException("oops, SLA exceeded", original));
Copy the code

Log or React on the Side

The doError operator can be used in cases where you want the error to continue propagating but react to it without modifying the sequence (for example, logging the error). This is equivalent to the “catch, log a specific error, and rethrow” pattern, as shown in the following example:

try {
  return callExternalService(k);
}
catch (RuntimeException error) {
  //make a record of the error
  log("uh oh, falling back, service failed for key " + k);
  throw error;
}
Copy the code

The doOnError operator and all operators prefixed with doOn have certain side-effects. They allow you to view the events of a sequence without modifying the sequence.

Like the imperative example shown earlier, the following example still propagates an error, but makes sure that we at least log an external service failure:

LongAdder failureStat = new LongAdder();
Flux<String> flux =
Flux.just("unknown")
    .flatMap(k -> callExternalService(k) 
        .doOnError(e -> {
            failureStat.increment();
            log("uh oh, falling back, service failed for key " + k); // Log}));Copy the code

Using Resources and the Finally Block

  • Use finally blocks to clean up resources
Stats stats = new Stats();
stats.startTimer();
try {
  doSomethingDangerous();
}
finally {
  stats.stopTimerAndRecordTiming();
}
Copy the code
  • Use the try-with-resource syntax
try (SomeAutoCloseable disposableInstance = new SomeAutoCloseable()) {
  return disposableInstance.toString();
}
Copy the code

They both have their own Reactor equivalent syntax: doFinally and using.

DoFinally is a side effect that you want to perform when the sequence terminates (using onComplete or onError) or is cancelled. It gives you an indication of what termination triggered the side effect. The following example shows how to use doFinally:

Stats stats = new Stats();
LongAdder statsCancel = new LongAdder();

Flux<String> flux =
Flux.just("foo"."bar")
    .doOnSubscribe(s -> stats.startTimer())
    .doFinally(type -> { 
        stats.stopTimerAndRecordTiming();
        if (type == SignalType.CANCEL) 
          statsCancel.increment();
    })
    .take(1); 
Copy the code

Retrying

Apply reactive programming to existing programs

From blocking to reaction

public class PersonDao {
    
    /** * blocking *@return* /
    public List<Person> listPeople(a) {
        return query("select * from people");
    }

    /** ** * of the equation@return* /
    public Flux<Person> rxListPeople(a) {
        return Flux.fromIterable(query("select * from people"));
    }

    private List<Person> query(String sql) {
        List<Person> people = new ArrayList<>();
        for (int i = 0; i < 3; i++) {
            Person person = new Person();
            person.setId(i);
            people.add(person);
        }
        returnpeople; }}Copy the code

We changed from the blocking API to the reactive API. Depending on the size of the system, compatibility with the original system can be a major issue. Next we combine reactive and blocking code using buffer and **blockLast **.

    @Test
    public void listPeopleTest(a) {
        // There are no side effects
        Flux<Person> peopleFlux = personDao.rxListPeople();
        Flux<List<Person>> listFlux = peopleFlux.buffer()
                .log();
        List<Person> people = listFlux.blockLast(Duration.ofSeconds(3));
        assertpeople ! =null;
        people.forEach(person -> log.info(person.toString()));
    }

11:21:50.598 [main] INFO reactor.Flux.Buffer1. - onSubscribe(FluxBuffer.BufferExactSubscriber)
11:21:50.601 [main] INFO reactor.Flux.Buffer1. - request(unbounded)
11:21:50.601 [main] INFO reactor.Flux.Buffer1. - onNext([Person(id=0), Person(id=1), Person(id=2)])
11:21:50.603 [main] INFO reactor.Flux.Buffer1. - cancel()
11:21:50.608 [main] INFO reactor.Flux.Buffer1. - onComplete()
11:21:50.616 [main] INFO wangxw.flux.Block2FluxTest - Person(id=0)
11:21:50.616 [main] INFO wangxw.flux.Block2FluxTest - Person(id=1)
11:21:50.616 [main] INFO wangxw.flux.Block2FluxTest - Person(id=2)
Copy the code

BlockLast blocks waiting for the onComplete callback to complete, and you might think that the above code just wraps and unwraps Flux without a particularly specific purpose. But this is only the first step. The next transformation will introduce some deferred execution capabilities. The mere presence of Flux does not mean that there will be background tasks or side effects, unlike a Future, which almost always means that some concurrent operation is being performed (a Future can only be returned if the task is committed).

Embrace delay execution

public Flux<Person> rxListPeople(a) {
        return Flux.defer(() ->
                Flux.fromIterable(query("select * from people")));
    }
Copy the code

Nothing happens until you subscribe.

From imperative to declarative concurrency

Explicit concurrency is not common in enterprise applications. In most cases, each request is handled by a single thread. The same thread does the following.

  1. Receives TCP/IP connections

  2. Parsing HTTP Requests

  3. Call Controller or Servlet

  4. Block a call to the database

  5. The processing results

  6. Encoded response (in JSON format, for example)

  7. Send the response to the client

If the back end is making multiple independent requests, such as accessing a database, this layered model can affect user latency because they are executed serially (which can easily be executed concurrently, of course). In addition, scalability will suffer. For example, in Tomcat’s executor, there are by default 200 threads responsible for processing requests, which means that no more than 200 concurrent connections can be processed. If there is a sudden surge of traffic, incoming connections will be queued and the server will experience higher latency. However, this situation will not last, and Tomcat will eventually start rejecting incoming traffic.

** The traditional architecture of executing each step of the request processing in a single thread also has some benefits, such as improved localization of the cache and reduced synchronization consumption. ** Unfortunately, ** In typical applications, because the overall latency is the sum of each layer’s latency, a faulty component can negatively affect the overall latency. ** In addition, sometimes many steps are independent of each other and can be executed concurrently. For example, calling multiple external apis or executing multiple independent SQL queries. For example, the following is a program with no concurrency capabilities.

@Slf4j
public class TicketService {
    /** ** query flight **@param flightNo
     * @return* /
    public Flight lookupFlight(String flightNo) {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return new Flight(flightNo);
    }

    /** ** query passenger **@param id
     * @return* /
    public Passenger findPassenger(Long id) {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return new Passenger(id);
    }

    /** * Book by flight and passenger **@param flight
     * @param passenger
     * @return* /
    public Ticket bookTicket(Flight flight, Passenger passenger) {
        return new Ticket(flight, passenger);
    }

    /** ** Send email **@param ticket
     * @return* /
    public boolean sendEmail(Ticket ticket) throws IOException {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info("send email {}", ticket);
        return true; }}Copy the code

The client code is shown below.

    @Test
    public void blockBookTicket(a) throws IOException {
        Flight flight = ticketService.lookupFlight("LOT 783");
        Passenger passenger = ticketService.findPassenger(1L);
        Ticket ticket = ticketService.bookTicket(flight, passenger);
        ticketService.sendEmail(ticket);
    }
Copy the code

This is typical blocking code, similar to code found in many applications. However, if you look at it from a latency perspective, the code snippet above can be broken down into four steps. The first two steps are independent of each other, and only the third step (bookTicket()) requires the return values of lookupFlight() and findPassenger(). There is an obvious opportunity to take advantage of concurrency, but developers rarely take this approach because it requires complex thread pools, futures, and callbacks. However, if the API is already RX-compatible, you can simply wrap legacy blocking code in Flux, as shown below.

    public Mono<Flight> rxLookupFlight(String flightNo) {
        return Mono.defer(() ->
                Mono.just(lookupFlight(flightNo)));
    }

    public Mono<Passenger> rxFindPassenger(Long id) {
        return Mono.defer(() ->
                Mono.just(findPassenger(id)));
    }
Copy the code

Semantically, the RX-methods actually accomplish the same task in the same way; in other words, they all block by default. From a client perspective, we didn’t really get any benefit other than the API being more verbose.

    @Test
    public void rxBookTicket(a) throws IOException {
        // It is just a placeholder, without any side effects
        Mono<Flight> flight = ticketService.rxLookupFlight("LOT 783");
        Mono<Passenger> passenger = ticketService.rxFindPassenger(1L);
        Mono<Ticket> ticket = flight.zipWith(passenger, 
                (f, p) -> ticketService.bookTicket(f, p));
        ticket.subscribe(ticketService::sendEmail);

    }
Copy the code

Whether it’s a traditional blocking program or a program that uses Rx, they work exactly the same way. However, the above code is executed lazily and we get two placeholders for Flight and Passenger without any side effects. At this point, no database queries or Web service calls are being performed.

In the above code you need to pay attention to where subscribe() is located. Often, though, your business logic just keeps combining Flux and returning it to some framework or scaffolding layer. The actual subscription is done behind the scenes by a Web framework or some glue code. It’s not bad practice to call subscribe() yourself, but it’s better to defer subscribing as far away as possible.

To understand the flow of execution, it is helpful to look from the bottom up. We subscribed to a ticket, so Rx must transparently subscribe to Flight and Passenger. At this point, the real business logic is executed. Because both Flux are cold and do not involve concurrency, a subscription to flight triggers the lookupFlight() blocking method in the calling thread. When lookupFlight() is complete, RxJava is ready to subscribe to Passenger. At this point, it has received the Flight instance via synchronized Flight. RxFindPassenger () calls findPassenger() blocking and receives a Passenger. After passing through this connection point, the data flows downstream. Flight and Passenger instances are combined via a supplied lambda expression (bookTicket), passed to ticket.subscribe().

There seems to be a lot of work to do here, and the way it runs is essentially the same as the blocking code we started with. However, we can now declaratively apply concurrency without changing any logic.

If the business method returns a Future (or the CompletableFuture, no essential difference), the system has already made two decisions for us.

  • The underlying call to lookupFlight() has already started, and there is no room for any delay. We won’t block on this method, but work has started.

  • We don’t have any control over concurrency. The implementation of the method determines whether the Future task is invoked from the thread pool or a new thread is created for each request.

Reactor gives users more control. In fact, Flux is generally already asynchronous, but in some cases, you may need to add concurrency to an existing Flux. When it comes to synchronous Flux, it is the consumer of the API, not the implementer of the API, who is free to decide which threading mechanism to use. All of this is done through the subscribeOn() operator, as shown below.

        Mono<Flight> flight = ticketService.rxLookupFlight("LOT 783")
                .subscribeOn(Schedulers.boundedElastic());
        Mono<Passenger> passenger = ticketService.rxFindPassenger(1L)
                .subscribeOn(Schedulers.boundedElastic())
                .timeout(Duration.ofSeconds(3)); // You can also declare a timeout
Copy the code

If the API was reactor-driven, we could insert the subscribeOn() operator anywhere before subscribing and provide a so-called Scheduler instance that would allow the two methods to execute concurrently without much effort.

But bookTicket() is still a fly in the ointment. It returns Ticket, which is definitely blocking. Although the booking process can be very fast, it’s worth declaring it the Reactor way, which makes the API easier to evolve.

    public Mono<Ticket> rxBookTicket(Flight flight, Passenger passenger) {
        return Mono.defer(() ->
                Mono.just(bookTicket(flight, passenger)));
    }
Copy the code

However, zipWith() now returns a weird-looking Mono

, and as a rule of thumb, whenever you see a double-wrapped type (e.g. Optional
>), which means that the call to flatMap() is missing somewhere.
<...>

    Mono<Mono<Ticket>> ticket = flight
           .zipWith(passenger, (f, p) -> ticketService.rxBookTicket(f, p));
Copy the code

We can use flatMap and pass it an identity function as follows:

    Mono<Ticket> ticket = flight
             .zipWith(passenger, (f, p) -> ticketService.rxBookTicket(f, p))
             .flatMap(abs -> abs);
Copy the code

It’s hard to avoid thinking that subscribeOn() is the right tool to implement concurrency in a Reactor. This operator does do this, but try not to use it (and the publishOn() described below). In reality, Flux comes from asynchronous sources, so there is no need for custom scheduling at all. You use subscribeOn() here only to show how you can upgrade an existing application to selectively use the reactive principle. In practice, however, Scheduler and subscribeOn() should be the “weapons” of last resort.

Use flatMap ()

In the example above, we have to send a Ticket list via email, where we need to note the following three points:

  1. The list can be long.
  2. It takes milliseconds, even seconds, to send an email.
  3. In the event of a delivery failure, the application needs to run smoothly, but eventually report which tickets were not delivered successfully.

The last requirement quickly rules out the simple tickets.foreach (this::sendEmail) method, which immediately throws an exception and does not continue to deliver the ticket.

Copy the code

The resources

Flux Javadoc