Introduction to the

Exception handling is a very important aspect of both reactive programming and programming in general. Today I’ll introduce you to the process for dealing with exceptions in Reactor.

Reactor’s general approach to exceptions

To take an example, we create a Flux in which we generate an exception and see what happens:

Flux flux2= Flux.just(1.2.0)
                .map(i -> "100 /" + i + "=" + (100 / i));
        flux2.subscribe(System.out::println);
Copy the code

We will have an unusually ErrorCallbackNotImplemented:

100 / 1 = 100
100 / 2 = 50

reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.ArithmeticException: / by zero
Copy the code

So how do you handle this exception?

There are two ways to do this. The first way is to specify the onError method when we subscribe, as we saw in the previous article:

Flux flux2= Flux.just(1.2.0)
                .map(i -> "100 /" + i + "=" + (100 / i));

        flux2.subscribe(System.out::println,
                error -> System.err.println("Error: " + error));
Copy the code

It’s the same code as before, but this time we subscribe with an onError handler. Look at the result:

Divided by zero :(
100 / 1 = 100
100 / 2 = 50
Error: java.lang.ArithmeticException: / by zero
Copy the code

You can see that the exception has been caught and handled appropriately.

In addition to subscribe, we can publish exceptions by specifying the exception handling mode. This is the second method we will cover:

        Flux flux= Flux.just(1.2.0)
                .map(i -> "100 /" + i + "=" + (100 / i))
                .onErrorReturn("Divided by zero :(");
        flux.subscribe(System.out::println);
Copy the code

In the above example, the onErrorReturn method is manually specified while creating Flux.

100 / 1 = 100
100 / 2 = 50
Divided by zero :(
Copy the code

Note that for Flux or Mono, all exceptions are a terminating operation, and even if you use exception handling, the original generated sequence will not continue.

But if you handle the exception, it will convert the oneError signal to the start of a new sequence and will replace the previous sequence generated upstream.

Detailed explanation of various exception handling methods

In a normal program, how should we handle exceptions? One of the things you can easily think of is try catch. The onError method of SUBSCRIBE in Reactor is a specific application of try catch:

Flux flux2= Flux.just(1.2.0)
                .map(i -> "100 /" + i + "=" + (100 / i));

        flux2.subscribe(System.out::println,
                error -> System.err.println("Error: " + error));
Copy the code

Again, we handled the exception in the onError method.

If converted to regular code, it would look like this:

    public void normalErrorHandle(a){
        try{
            Arrays.asList(1.2.0).stream().map(i -> "100 /" + i + "=" + (100 / i)).forEach(System.out::println);
        }catch (Exception e){
            System.err.println("Error: "+ e); }}Copy the code

In addition to this basic exception handling method, Reactor provides a number of different exception handling methods, which are described below.

Static Fallback Value

Static Fallback Value means that an exception will Fallback to a Static default Value. For example, onErrorReturn, which we talked about earlier.

Flux flux= Flux.just(1.2.0)
                .map(i -> "100 /" + i + "=" + (100 / i))
                .onErrorReturn("Divided by zero :(");
Copy the code

Of course, onErrorReturn also supports a Predicate argument, which determines whether the exception to falback meets the conditions.

public final Flux<T> onErrorReturn(Predicate<? super Throwable> predicate, T fallbackValue) 
Copy the code

Fallback Method

In addition to fallback Value, fallback Method is supported. That is, if you want to call another Method after catching an exception, you can use the Fallback Method.

Fallback Method is denoted by onErrorResume.

    public void useFallbackMethod(a){
        Flux flux= Flux.just(1.2.0)
                .map(i -> "100 /" + i + "=" + (100 / i))
                .onErrorResume(e -> System.out::println);
        flux.subscribe(System.out::println);
    }
Copy the code

Dynamic Fallback Value

The dynamic Fallback Value is based on the exception you throw, by positioning different errors to Fallback to different values:

    public void useDynamicFallback(a){
        Flux flux= Flux.just(1.2.0)
                .map(i -> "100 /" + i + "=" + (100 / i))
                .onErrorResume(error -> Mono.just(
                        MyWrapper.fromError(error)));
    }

    public static class MyWrapper{
        public static String fromError(Throwable error){
            return "That is a new Error"; }}Copy the code

Catch and Rethrow

Similarly, we can rethrow after catching an exception:

Flux flux= Flux.just(1.2.0)
                .map(i -> "100 /" + i + "=" + (100 / i))
                .onErrorResume(error -> Flux.error(
                        new RuntimeException("oops, ArithmeticException!", error)));

        Flux flux2= Flux.just(1.2.0)
                .map(i -> "100 /" + i + "=" + (100 / i))
                .onErrorMap(error -> new RuntimeException("oops, ArithmeticException!", error));
Copy the code

There are two ways to do this. The first way is to build a new Flux using flux.error in onErrorResume, and the other way is to process it directly in onErrorMap.

Log or React on the Side

Sometimes you just want to log exceptions without breaking the React structure, so try using doOnError.

    public void useDoOnError(a){
        Flux flux= Flux.just(1.2.0)
                .map(i -> "100 /" + i + "=" + (100 / i))
                .doOnError(error -> System.out.println("we got the error: "+ error));
    }
Copy the code

Finally Block

If we use some resource in our code, we generally need to close it in finally or use the try-with-resource introduced in JDK7.

For example, here’s how to use finally:

Stats stats = new Stats();
stats.startTimer();
try {
  doSomethingDangerous();
}
finally {
  stats.stopTimerAndRecordTiming();
}
Copy the code

Here’s how to use try-with-resource:

try (SomeAutoCloseable disposableInstance = new SomeAutoCloseable()) {
  return disposableInstance.toString();
}
Copy the code

So in Reactor, we have two ways of matching that.

The first is the doFinally method:

Stats stats = new Stats();
LongAdder statsCancel = new LongAdder();

Flux<String> flux =
Flux.just("foo"."bar")
    .doOnSubscribe(s -> stats.startTimer())
    .doFinally(type -> { 
        stats.stopTimerAndRecordTiming();
        if (type == SignalType.CANCEL) 
          statsCancel.increment();
    })
    .take(1); 
Copy the code

In the example above, doFinally actually does what finally Block does.

The second is to use using. Let’s look at a definition of using:


	public static <T, D> Flux<T> using(Callable<? extends D> resourceSupplier, Function<? super D, ? extends
			Publisher<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup)
Copy the code

As you can see, using supports three arguments. ResourceSupplier is a generator that generates the resource object to be sent when we subscribe.

SourceSupplier is a factory that generates Publisher, receives resources passed by resourceSupplier, and generates Publisher objects.

ResourceCleanup is used to wind down resources.

So how do we use it?

Here’s an example:

    public void useUsing(a){
        AtomicBoolean isDisposed = new AtomicBoolean();
        Disposable disposableInstance = new Disposable() {
            @Override
            public void dispose(a) {
                isDisposed.set(true);
            }

            @Override
            public String toString(a) {
                return "DISPOSABLE"; }}; Flux<String> flux = Flux.using( () -> disposableInstance, disposable -> Flux.just(disposable.toString()), Disposable::dispose); }Copy the code

In the above example, we create a Disposable as a Resource, process this resource, return a Flux object, and finally dispose of the Resource by calling Disposable:: Dispose method.

Retrying

Sometimes we encounter an exception and may need to retry it several times. Reactor provides a retry method. Here is an example:

    public void testRetry(a){
        Flux.interval(Duration.ofMillis(250))
                .map(input -> {
                    if (input < 3) {return "tick " + input;
                    } 
                    throw new RuntimeException("boom");
                })
                .retry(1)
                .elapsed()
                .subscribe(System.out::println, System.err::println);

        try {
            Thread.sleep(2100);
        } catch(InterruptedException e) { e.printStackTrace(); }}Copy the code

Take a look at the output:

[264,tick 0]
[255,tick 1]
[241,tick 2]
[506,tick 0]
[252,tick 1]
[253,tick 2]
java.lang.RuntimeException: boom
Copy the code

Retry is used to restart a new sequence when an exception is encountered.

Elapsed is the duration between the value times that are generated.

As you can see from the results, exception messages are not generated before retry.

The example of this article is learn-Reactive

Author: Flydean program stuff

Link to this article: www.flydean.com/reactor-han…

Source: Flydean’s blog

Welcome to pay attention to my public number: “procedures those things” the most popular interpretation, the most profound dry goods, the most concise tutorial, many you do not know the small skills you find!