Introduction to the

Today we are going to introduce the multithreading model and the timer model in Reactor. Reactor has been introduced before. It is actually an extension of the observer model.

So the Reactor is essentially multithreaded independent. You can use it with multithreading or not with multithreading.

Today I’ll show you how to use the multithreading and timer model in Reactor.

Thread multithreading

Let’s take a look at the previous example of Flux creation:

        Flux<String> flux = Flux.generate(
                () -> 0,
                (state, sink) -> {
                    sink.next("3 x " + state + "=" + 3*state);
                    if (state == 10) sink.complete();
                    return state + 1;
                });

        flux.subscribe(System.out::println);
Copy the code

As you can see, both Flux Generator and subscriber are actually running in the same thread.

If we want the subscribe to occur in a new thread, we need to start a new thread and subscribe inside the thread.

        Mono<String> mono = Mono.just("hello ");

        Thread t = new Thread(() -> mono
                .map(msg -> msg + "thread ")
                .subscribe(v ->
                        System.out.println(v + Thread.currentThread().getName())
                )
        );
        t.start();
        t.join();
Copy the code

In the example above, Mono is created in the main Thread, while SUBSCRIBE occurs in the newly started Thread.

Schedule the timer

In many cases, our Publisher needs to periodically call methods to produce elements. Reactor provides a new Schedule class that is responsible for the generation and management of scheduled tasks.

Scheduler is an interface:

public interface Scheduler extends Disposable 
Copy the code

It defines some methods that must be implemented in timers:

For example, immediately executed:

Disposable schedule(Runnable task);
Copy the code

Delayed execution:

default Disposable schedule(Runnable task, long delay, TimeUnit unit)
Copy the code

And regularly performed:

default Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit)
Copy the code

Schedule has a utility class called Schedules, which provides multiple methods to create Scheduler, essentially encapsulating the ExecutorService and ScheduledExecutorService. Create Schedule as Supplier.

In a nutshell, Schedule is an encapsulation of the ExecutorService.

Schedulers tools

The Schedulers utility class provides a number of useful utility classes, which we explore in detail:

Schedulers. Immediate () :

The submitted Runnable will be executed immediately in the current thread.

Schedulers. Single () :

Use the same thread to perform all tasks.

Schedulers. BoundedElastic () :

Create a reusable thread pool that will be reclaimed if the threads in the pool are not used for a long time. BoundedElastic has a maximum number of threads, typically CPU cores x 10. If there are no worker threads currently available, the submitted task will be queued to wait.

Schedulers. The parallel () :

Create a fixed number of worker threads that correspond to the number of CPU cores.

Schedulers. FromExecutorService (ExecutorService) :

Create a Scheduler from an existing thread pool.

Schedulers. NewXXX:

Schedulers provides a number of new starting methods to create various Schedulers.

Looking at a specific use of Schedulers, we can specify specific Schedulers to produce elements:

Flux.interval(Duration.ofMillis(300), Schedulers.newSingle("test"))
Copy the code

PublishOn and subscribeOn

PublishOn and subscribeOn are mainly used to switch the execution context of Scheduler.

One conclusion is that in a chain call, publishOn can switch Scheduler, but subscribeOn does not.

This is because the true publish-subscribe relationship is established only when subscriber starts to subscribe.

Let’s take a look at the use of these two methods:

publishOn

PublishOn can publish during a chain call:

    @Test
    public void usePublishOn(a) throws InterruptedException {
        Scheduler s = Schedulers.newParallel("parallel-scheduler".4);
        final Flux<String> flux = Flux
                .range(1.2)
                .map(i -> 10 + i + ":"+ Thread.currentThread())
                .publishOn(s)
                .map(i -> "value " + i+":"+ Thread.currentThread());

        new Thread(() -> flux.subscribe(System.out::println),"ThreadA").start();
        System.out.println(Thread.currentThread());
        Thread.sleep(5000);
    }
Copy the code

Above we created a scheduler named parallel-Scheduler.

A Flux is then created, which performs a map operation, switches the execution context to parallel-Scheduler, and finally performs a map operation right.

Finally, we take a new thread to subscribe.

Take a look at the output:

Thread[main,5,main]
value 11:Thread[ThreadA,5,main]:Thread[parallel-scheduler-1.5,main]
value 12:Thread[ThreadA,5,main]:Thread[parallel-scheduler-1.5,main]
Copy the code

As you can see, the name of the main Thread is Thread. The Subscriber thread name is ThreadA.

So before publishOn, the thread used by map is ThreadA. After publishOn, the thread used by map switches to the parallel-Scheduler thread pool.

subscribeOn

SubscribeOn is the execution context used to switch Subscriber. No matter which part subscribeOn appears in the call chain, it will eventually be applied to the whole call chain.

Let’s look at an example:

    @Test
    public void useSubscribeOn(a) throws InterruptedException {
        Scheduler s = Schedulers.newParallel("parallel-scheduler".4);
        final Flux<String> flux = Flux
                .range(1.2)
                .map(i -> 10 + i + ":" + Thread.currentThread())
                .subscribeOn(s)
                .map(i -> "value " + i + ":"+ Thread.currentThread());

        new Thread(() -> flux.subscribe(System.out::println), "ThreadA").start();
        Thread.sleep(5000);
    }
Copy the code

Similarly, in the above example, we use two maps and then use a subscribeOn in both maps to switch the subscribe execution context.

Take a look at the output:

value 11:Thread[parallel-scheduler-1.5,main]:Thread[parallel-scheduler-1.5,main]
value 12:Thread[parallel-scheduler-1.5,main]:Thread[parallel-scheduler-1.5,main]
Copy the code

As you can see, no matter which map is used, the parallel Scheduler is switched.

The example of this article is learn-Reactive

Author: Flydean program stuff

Link to this article: www.flydean.com/reactor-thr…

Source: Flydean’s blog

Welcome to pay attention to my public number: “procedures those things” the most popular interpretation, the most profound dry goods, the most concise tutorial, many you do not know the small skills you find!