sequence

This paper mainly studies the Schedulers of Reactive Streams

background

By default, both Mono and Flux run on the main thread, and sometimes block the main thread. Schedulers can be programmed to run on other threads.

Raw output

The output is as follows when publishOn and subscribeOn are not used

11:26:10. 668. [the main] the DEBUG reactor. The util. Loggers$LoggerFactory- Using Slf4j logging framework 11:26:11, 097 [main] INFO com. Example. The demo. SchedulerTest - defer thread: [the main] 11:26:11. [the main] INFO 116 com. Example. Demo. SchedulerTest - filter thread: [the main] 11:26:11. [the main] 116 INFO Com. Example. Demo. SchedulerTest - filter thread: [the main] 11:26:11. The 116 [main] INFO com. Example. Demo. SchedulerTest - The subscribe thread: [the main], data: 2 11:26:11. 116 [main] INFO com. Example. Demo. SchedulerTest - filter thread: [the main] 11:26:11. [the main] INFO 116 com. Example. Demo. SchedulerTest - filter thread: [the main] 11:26:11. [the main] 117 INFO com.example.demo.SchedulerTest - subscribe thread:[main],data :4Copy the code

publishOn(Configure a thread for subscriber)

	@Test
    public void testPublisherThread(){
        Scheduler pubScheduler = Schedulers.newSingle("pub-thread");
        Flux.defer(() -> {
            LOGGER.info("defer thread:[{}]",Thread.currentThread().getName());
            returnFlux. The range (1, 4); }) .filter(e -> { LOGGER.info("filter thread:[{}]",Thread.currentThread().getName());
                    return e % 2 == 0;
                })
                .publishOn(pubScheduler)
                .subscribe(e -> {
                    LOGGER.info("subscribe thread:[{}],data :{}",Thread.currentThread().getName(),e);
                });
    }
Copy the code

The output

11:31:23. 691. [the main] the DEBUG reactor. The util. Loggers$LoggerFactory- Using Slf4j logging framework 11:31:23, 871 [main] INFO com. Example. The demo. SchedulerTest - defer thread: [the main] 11:31:23. [the main] INFO 880 com. Example. Demo. SchedulerTest - filter thread: [the main] 11:31:23. [the main] 881 INFO Com. Example. Demo. SchedulerTest - filter thread: [the main] 11:31:23. The 881 [main] INFO com. Example. Demo. SchedulerTest - filter Thread: [the main] 11:31:23 881 [main] INFO com. Example. The demo. SchedulerTest - filter thread: [the main] 11:31:23. 881 INFO com [publisher - thread - 1]. The example. The demo. SchedulerTest - subscribe thread: [publisher - thread - 1), data: 2 11:31:23. 881 [publisher-thread-1] INFO com.example.demo.SchedulerTest - subscribe thread:[publisher-thread-1],data :4Copy the code

As you can see, the publishOn configuration changes the subscribe thread

subscribeOn(Configure threads for Publisher)

	@Test
    public void testSubscriberThread() throws InterruptedException {
        Scheduler subScheduler = Schedulers.newSingle("sub-thread");
        Flux.defer(() -> {
            LOGGER.info("defer thread:[{}]",Thread.currentThread().getName());
            returnFlux. The range (1, 4); }) .filter(e -> { LOGGER.info("filter thread:[{}]",Thread.currentThread().getName());
                    return e % 2 == 0;
                })
                .subscribeOn(subScheduler)
                .subscribe(e -> {
                    LOGGER.info("subscribe thread:[{}],data :{}",Thread.currentThread().getName(),e);
                });
        Thread.sleep(10*1000);
    }
Copy the code

The output is as follows:

11:31:58. 294. [the main] the DEBUG reactor. The util. Loggers$LoggerFactory[the subscriber - Using Slf4j logging framework 11:31:58. 528 - thread - 1] INFO com. Example. Demo. SchedulerTest - defer Thread: [the subscriber - thread - 1] 11:31:58. 532 INFO com [subscriber - thread - 1]. The example. The demo. SchedulerTest - filter Thread: [the subscriber - thread - 1] 11:31:58. 532 INFO com [subscriber - thread - 1]. The example. The demo. SchedulerTest - filter Thread: [the subscriber - thread - 1] 11:31:58. 532 INFO com [subscriber - thread - 1]. The example. The demo. SchedulerTest - subscribe Thread: [the subscriber - thread - 1), data: 2 11:31:58. 533 INFO com [subscriber - thread - 1]. The example. The demo. SchedulerTest - filter Thread: [the subscriber - thread - 1] 11:31:58. 533 INFO com [subscriber - thread - 1]. The example. The demo. SchedulerTest - filter Thread: [the subscriber - thread - 1] 11:31:58. 533 INFO com [subscriber - thread - 1]. The example. The demo. SchedulerTest - subscribe thread:[subscriber-thread-1],data :4Copy the code

It can be found that after subscribeOn is configured, all items are run in this thread, including defer, filter and subscribe

PublishOn and subscribeOn

	@Test
    public void testPublisherAndSubscriberThread() throws InterruptedException {
        Scheduler pubScheduler = Schedulers.newSingle("publisher-thread");
        Scheduler subScheduler = Schedulers.newSingle("subscriber-thread");
        Flux.defer(() -> {
            LOGGER.info("defer thread:[{}]",Thread.currentThread().getName());
            returnFlux. The range (1, 4); }) .filter(e -> { LOGGER.info("filter thread:[{}]",Thread.currentThread().getName());
                    return e % 2 == 0;
                })
                .publishOn(pubScheduler)
                .subscribeOn(subScheduler)
                .subscribe(e -> {
                    LOGGER.info("subscribe thread:[{}],data :{}",Thread.currentThread().getName(),e);
                });
        Thread.sleep(10*1000);
    }
Copy the code

The output

11:33:00. 964. [the main] the DEBUG reactor. The util. Loggers$LoggerFactory[the subscriber - Using Slf4j logging framework 11:33:01. 125 - thread - 1] INFO com. Example. Demo. SchedulerTest - defer Thread: [the subscriber - thread - 1] 11:33:01. 134 INFO com [subscriber - thread - 1]. The example. The demo. SchedulerTest - filter Thread: [the subscriber - thread - 1] 11:33:01. 135 INFO com [subscriber - thread - 1]. The example. The demo. SchedulerTest - filter Thread: [the subscriber - thread - 1] 11:33:01. 135 INFO com [subscriber - thread - 1]. The example. The demo. SchedulerTest - filter Thread: [the subscriber - thread - 1] 11:33:01. 135 INFO com [subscriber - thread - 1]. The example. The demo. SchedulerTest - filter 11:33:01 thread: [the subscriber - thread - 1]. The 135 [publisher - thread - 2] INFO com. Example. Demo. SchedulerTest - subscribe Thread: [publisher - thread - 2), data: 2 11:33:01. [publisher - thread - 2] 135 INFO com. Example. Demo. SchedulerTest - subscribe thread:[publisher-thread-2],data :4Copy the code

With the call configured, you can see that subscriber is running on the publishOn configuration thread, and defer, filter, and others are running on the subscribeOn configuration thread

PublishOn and filter

	@Test
    public void testFilterThread(){
        Scheduler pubScheduler = Schedulers.newSingle("publisher-thread");
        Flux.defer(() -> {
            LOGGER.info("defer thread:[{}]",Thread.currentThread().getName());
            returnFlux. The range (1, 4); }).publishon (pubScheduler) //NOTE that this is before filter. Filter (e -> {logger.info ())."filter thread:[{}]",Thread.currentThread().getName());
                    return e % 2 == 0;
                })
                .subscribe(e -> {
                    LOGGER.info("subscribe thread:[{}],data :{}",Thread.currentThread().getName(),e);
                });
    }
Copy the code

The output

13:19:01. 606. [the main] the DEBUG reactor. The util. Loggers$LoggerFactory- Using Slf4j logging framework 13:19:01, 754 [main] INFO com. Example. The demo. SchedulerTest - defer thread: [the main] [publisher 13:19:01. 766 - thread - 1] INFO com. Example. Demo. SchedulerTest - filter thread: [publisher - thread - 1] 13:19:01. 766 INFO com [publisher - thread - 1]. The example. The demo. SchedulerTest - filter thread: [publisher - thread - 1] 13:19:01. 766 INFO com [publisher - thread - 1]. The example. The demo. SchedulerTest - subscribe thread: [publisher - thread - 1), data: 2 13:19:01. 766 INFO com [publisher - thread - 1]. The example. The demo. SchedulerTest - filter thread: [publisher - thread - 1] 13:19:01. 766 INFO com [publisher - thread - 1]. The example. The demo. SchedulerTest - filter thread: [publisher - thread - 1] 13:19:01. 767 [publisher-thread-1] INFO com.example.demo.SchedulerTest - subscribe thread:[publisher-thread-1],data :4Copy the code

If publishOn is placed before the filter thread, we can see that the filter thread becomes publisher thread. After publishOn, the filter or map thread will use publishOn configuration. Previously, you used the main thread or the subscribeOn configured thread

SubscribeOn and filter

There is no difference between putting subscribeOn before filter, because when publishOn is not configured, subscribeOn applies to all, including filter

window scheduler

You can also set a thread pool for the Window method

	@Test
    public void testWindowScheduler() throws InterruptedException {
        Scheduler windowScheduler = Schedulers.newSingle("window-thread");
        Flux.defer(() -> {
            LOGGER.info("defer thread:[{}]",Thread.currentThread().getName());
            returnFlux. The range (1, 4); }).delayElements(duration.ofmillis (200)) // Subscribe thread.windowTimeout(1, duration.ofmillis (100), windowScheduler) .onErrorReturn(Flux.<Integer>just(-1)) .flatMap(e -> {return e.map(item -> item*10);
                })
                .subscribe(e -> {
                    LOGGER.info("subscribe thread:[{}],data :{}",Thread.currentThread().getName(),e);
                });
        Thread.sleep(10*1000);
    }
Copy the code

The output

14:15:28. 523. [the main] the DEBUG reactor. The util. Loggers$LoggerFactory- Using Slf4j logging framework 14:15:28, 701 [main] INFO com. Example. The demo. SchedulerTest - defer thread: [the main] 14:15:28. 961 INFO com [the parallel - 1]. The example. The demo. SchedulerTest - subscribe thread: [] the parallel - 1, data: 10 14:15:29. 167 INFO com [window - thread - 1]. The example. The demo. SchedulerTest - subscribe thread: window - thread - [1], the data: 20 14:15:29. 370 INFO com [window - thread - 1]. The example. The demo. SchedulerTest - subscribe thread: window - thread - [1], the data: 30 14:15:29. 573 [parallel-4] INFO com.example.demo.SchedulerTest - subscribe thread:[parallel-4],data :40Copy the code

Note that the delayElements method creates a parallel thread timeout() for subscriber by default, and the skip() and other methods also create threads by default

scheduleGroup

Schedulers. NewSingle is used previously in publishOn and subscribeOn. Groups composed of multiple threads can also be used, for example

Scheduler parallelGroup = Schedulers.newParallel("parallel-group"And 8);Copy the code

You can also use the elastic type, which is more suitable for IO type operations

	/**
	 * {@link Scheduler} that dynamically creates ExecutorService-based Workers and caches
	 * the thread pools, reusing them once the Workers have been shut down.
	 * <p>
	 * The maximum number of created thread pools is unbounded.
	 * <p>
	 * The default time-to-live for unused thread pools is 60 seconds, use the appropriate
	 * factory to push a different value.
	 * <p>
	 * This scheduler is not restartable.
	 *
	 * @param name Thread prefix
	 *
	 * @return a new {@link Scheduler} that hosts a fixed pool of single-threaded
	 * ExecutorService-based workers and is suited for parallel work
	 */
	public static Scheduler newElastic(String name) {
		return newElastic(name, ElasticScheduler.DEFAULT_TTL_SECONDS);
	}
Copy the code

The instance

	@Test
    public void testElasticGroup() throws InterruptedException {
        Scheduler elastic = Schedulers.newElastic("elastic-group");
        Flux.defer(() -> {
            LOGGER.info("defer thread:[{}]",Thread.currentThread().getName());
            returnFlux. The range (1, 4); }) .filter(e -> { LOGGER.info("filter thread:[{}]",Thread.currentThread().getName());
                    return e % 2 == 0;
                })
                .publishOn(elastic)
                .map(e -> {
                    LOGGER.info("map thread:[{}]",Thread.currentThread().getName());
                    return e * 10;
                })
                .subscribeOn(elastic)
                .subscribe(e -> {
                    LOGGER.info("subscribe thread:[{}],data :{}",Thread.currentThread().getName(),e);
                });
        Thread.sleep(10*1000);
    }
Copy the code

The output

13:58:37. 356. [the main] the DEBUG reactor. The util. Loggers$LoggerFactory- Using Slf4j logging framework 13:58:37. 514 [elastic - group - 2] INFO com. Example. Demo. SchedulerTest - defer Thread: [elastic - group - 2] 13:58:37. [elastic - group - 2] 520 INFO com. Example. Demo. SchedulerTest - filter Thread: [elastic - group - 2] 13:58:37. [elastic - group - 2] 520 INFO com. Example. Demo. SchedulerTest - filter Thread: [elastic - group - 2] 13:58:37. [elastic - group - 2] 520 INFO com. Example. Demo. SchedulerTest - filter Thread: [elastic - group - 2] 13:58:37. [elastic - group - 2] 520 INFO com. Example. Demo. SchedulerTest - filter Thread: [elastic - group - 2] 13:58:37, 520 [elastic - group - 3] INFO com. Example. Demo. SchedulerTest - map Thread: [elastic - group - 3] 13:58:37, 520 [elastic - group - 3] INFO com. Example. Demo. SchedulerTest - subscribe Thread: [elastic - group - 3], data: 20 13:58:37, 521 [elastic - group - 3] INFO com. Example. Demo. SchedulerTest - map Thread: [elastic - group - 3] 13:58:37, 521 [elastic - group - 3] INFO com. Example. Demo. SchedulerTest - subscribe thread:[elastic-group-3],data :40Copy the code

summary

  • Naming this publishOn and subscribeOn method name is a little more obscure, and a little more straightforward equivalent to subscriberThreadPools and publisherThreadPools.
  • PublishOn and Operations locations

Filter or map after publishOn will use the publishOn configuration thread; Previously, you used the main thread or the subscribeOn configured thread

  • subscribeOn

If publishOn is not configured and subscribeOn is configured only, all are applied

  • Method built-in threads delayElements(),timeout(),skip() built-in use of additional threads

doc

  • schedulers