sequence
This paper mainly studies the Schedulers of Reactive Streams
background
By default, both Mono and Flux run on the main thread, and sometimes block the main thread. Schedulers can be programmed to run on other threads.
Raw output
The output is as follows when publishOn and subscribeOn are not used
11:26:10. 668. [the main] the DEBUG reactor. The util. Loggers$LoggerFactory- Using Slf4j logging framework 11:26:11, 097 [main] INFO com. Example. The demo. SchedulerTest - defer thread: [the main] 11:26:11. [the main] INFO 116 com. Example. Demo. SchedulerTest - filter thread: [the main] 11:26:11. [the main] 116 INFO Com. Example. Demo. SchedulerTest - filter thread: [the main] 11:26:11. The 116 [main] INFO com. Example. Demo. SchedulerTest - The subscribe thread: [the main], data: 2 11:26:11. 116 [main] INFO com. Example. Demo. SchedulerTest - filter thread: [the main] 11:26:11. [the main] INFO 116 com. Example. Demo. SchedulerTest - filter thread: [the main] 11:26:11. [the main] 117 INFO com.example.demo.SchedulerTest - subscribe thread:[main],data :4Copy the code
publishOn(Configure a thread for subscriber
)
@Test
public void testPublisherThread(){
Scheduler pubScheduler = Schedulers.newSingle("pub-thread");
Flux.defer(() -> {
LOGGER.info("defer thread:[{}]",Thread.currentThread().getName());
returnFlux. The range (1, 4); }) .filter(e -> { LOGGER.info("filter thread:[{}]",Thread.currentThread().getName());
return e % 2 == 0;
})
.publishOn(pubScheduler)
.subscribe(e -> {
LOGGER.info("subscribe thread:[{}],data :{}",Thread.currentThread().getName(),e);
});
}
Copy the code
The output
11:31:23. 691. [the main] the DEBUG reactor. The util. Loggers$LoggerFactory- Using Slf4j logging framework 11:31:23, 871 [main] INFO com. Example. The demo. SchedulerTest - defer thread: [the main] 11:31:23. [the main] INFO 880 com. Example. Demo. SchedulerTest - filter thread: [the main] 11:31:23. [the main] 881 INFO Com. Example. Demo. SchedulerTest - filter thread: [the main] 11:31:23. The 881 [main] INFO com. Example. Demo. SchedulerTest - filter Thread: [the main] 11:31:23 881 [main] INFO com. Example. The demo. SchedulerTest - filter thread: [the main] 11:31:23. 881 INFO com [publisher - thread - 1]. The example. The demo. SchedulerTest - subscribe thread: [publisher - thread - 1), data: 2 11:31:23. 881 [publisher-thread-1] INFO com.example.demo.SchedulerTest - subscribe thread:[publisher-thread-1],data :4Copy the code
As you can see, the publishOn configuration changes the subscribe thread
subscribeOn(Configure threads for Publisher
)
@Test
public void testSubscriberThread() throws InterruptedException {
Scheduler subScheduler = Schedulers.newSingle("sub-thread");
Flux.defer(() -> {
LOGGER.info("defer thread:[{}]",Thread.currentThread().getName());
returnFlux. The range (1, 4); }) .filter(e -> { LOGGER.info("filter thread:[{}]",Thread.currentThread().getName());
return e % 2 == 0;
})
.subscribeOn(subScheduler)
.subscribe(e -> {
LOGGER.info("subscribe thread:[{}],data :{}",Thread.currentThread().getName(),e);
});
Thread.sleep(10*1000);
}
Copy the code
The output is as follows:
11:31:58. 294. [the main] the DEBUG reactor. The util. Loggers$LoggerFactory[the subscriber - Using Slf4j logging framework 11:31:58. 528 - thread - 1] INFO com. Example. Demo. SchedulerTest - defer Thread: [the subscriber - thread - 1] 11:31:58. 532 INFO com [subscriber - thread - 1]. The example. The demo. SchedulerTest - filter Thread: [the subscriber - thread - 1] 11:31:58. 532 INFO com [subscriber - thread - 1]. The example. The demo. SchedulerTest - filter Thread: [the subscriber - thread - 1] 11:31:58. 532 INFO com [subscriber - thread - 1]. The example. The demo. SchedulerTest - subscribe Thread: [the subscriber - thread - 1), data: 2 11:31:58. 533 INFO com [subscriber - thread - 1]. The example. The demo. SchedulerTest - filter Thread: [the subscriber - thread - 1] 11:31:58. 533 INFO com [subscriber - thread - 1]. The example. The demo. SchedulerTest - filter Thread: [the subscriber - thread - 1] 11:31:58. 533 INFO com [subscriber - thread - 1]. The example. The demo. SchedulerTest - subscribe thread:[subscriber-thread-1],data :4Copy the code
It can be found that after subscribeOn is configured, all items are run in this thread, including defer, filter and subscribe
PublishOn and subscribeOn
@Test
public void testPublisherAndSubscriberThread() throws InterruptedException {
Scheduler pubScheduler = Schedulers.newSingle("publisher-thread");
Scheduler subScheduler = Schedulers.newSingle("subscriber-thread");
Flux.defer(() -> {
LOGGER.info("defer thread:[{}]",Thread.currentThread().getName());
returnFlux. The range (1, 4); }) .filter(e -> { LOGGER.info("filter thread:[{}]",Thread.currentThread().getName());
return e % 2 == 0;
})
.publishOn(pubScheduler)
.subscribeOn(subScheduler)
.subscribe(e -> {
LOGGER.info("subscribe thread:[{}],data :{}",Thread.currentThread().getName(),e);
});
Thread.sleep(10*1000);
}
Copy the code
The output
11:33:00. 964. [the main] the DEBUG reactor. The util. Loggers$LoggerFactory[the subscriber - Using Slf4j logging framework 11:33:01. 125 - thread - 1] INFO com. Example. Demo. SchedulerTest - defer Thread: [the subscriber - thread - 1] 11:33:01. 134 INFO com [subscriber - thread - 1]. The example. The demo. SchedulerTest - filter Thread: [the subscriber - thread - 1] 11:33:01. 135 INFO com [subscriber - thread - 1]. The example. The demo. SchedulerTest - filter Thread: [the subscriber - thread - 1] 11:33:01. 135 INFO com [subscriber - thread - 1]. The example. The demo. SchedulerTest - filter Thread: [the subscriber - thread - 1] 11:33:01. 135 INFO com [subscriber - thread - 1]. The example. The demo. SchedulerTest - filter 11:33:01 thread: [the subscriber - thread - 1]. The 135 [publisher - thread - 2] INFO com. Example. Demo. SchedulerTest - subscribe Thread: [publisher - thread - 2), data: 2 11:33:01. [publisher - thread - 2] 135 INFO com. Example. Demo. SchedulerTest - subscribe thread:[publisher-thread-2],data :4Copy the code
With the call configured, you can see that subscriber is running on the publishOn configuration thread, and defer, filter, and others are running on the subscribeOn configuration thread
PublishOn and filter
@Test
public void testFilterThread(){
Scheduler pubScheduler = Schedulers.newSingle("publisher-thread");
Flux.defer(() -> {
LOGGER.info("defer thread:[{}]",Thread.currentThread().getName());
returnFlux. The range (1, 4); }).publishon (pubScheduler) //NOTE that this is before filter. Filter (e -> {logger.info ())."filter thread:[{}]",Thread.currentThread().getName());
return e % 2 == 0;
})
.subscribe(e -> {
LOGGER.info("subscribe thread:[{}],data :{}",Thread.currentThread().getName(),e);
});
}
Copy the code
The output
13:19:01. 606. [the main] the DEBUG reactor. The util. Loggers$LoggerFactory- Using Slf4j logging framework 13:19:01, 754 [main] INFO com. Example. The demo. SchedulerTest - defer thread: [the main] [publisher 13:19:01. 766 - thread - 1] INFO com. Example. Demo. SchedulerTest - filter thread: [publisher - thread - 1] 13:19:01. 766 INFO com [publisher - thread - 1]. The example. The demo. SchedulerTest - filter thread: [publisher - thread - 1] 13:19:01. 766 INFO com [publisher - thread - 1]. The example. The demo. SchedulerTest - subscribe thread: [publisher - thread - 1), data: 2 13:19:01. 766 INFO com [publisher - thread - 1]. The example. The demo. SchedulerTest - filter thread: [publisher - thread - 1] 13:19:01. 766 INFO com [publisher - thread - 1]. The example. The demo. SchedulerTest - filter thread: [publisher - thread - 1] 13:19:01. 767 [publisher-thread-1] INFO com.example.demo.SchedulerTest - subscribe thread:[publisher-thread-1],data :4Copy the code
If publishOn is placed before the filter thread, we can see that the filter thread becomes publisher thread. After publishOn, the filter or map thread will use publishOn configuration. Previously, you used the main thread or the subscribeOn configured thread
SubscribeOn and filter
There is no difference between putting subscribeOn before filter, because when publishOn is not configured, subscribeOn applies to all, including filter
window scheduler
You can also set a thread pool for the Window method
@Test
public void testWindowScheduler() throws InterruptedException {
Scheduler windowScheduler = Schedulers.newSingle("window-thread");
Flux.defer(() -> {
LOGGER.info("defer thread:[{}]",Thread.currentThread().getName());
returnFlux. The range (1, 4); }).delayElements(duration.ofmillis (200)) // Subscribe thread.windowTimeout(1, duration.ofmillis (100), windowScheduler) .onErrorReturn(Flux.<Integer>just(-1)) .flatMap(e -> {return e.map(item -> item*10);
})
.subscribe(e -> {
LOGGER.info("subscribe thread:[{}],data :{}",Thread.currentThread().getName(),e);
});
Thread.sleep(10*1000);
}
Copy the code
The output
14:15:28. 523. [the main] the DEBUG reactor. The util. Loggers$LoggerFactory- Using Slf4j logging framework 14:15:28, 701 [main] INFO com. Example. The demo. SchedulerTest - defer thread: [the main] 14:15:28. 961 INFO com [the parallel - 1]. The example. The demo. SchedulerTest - subscribe thread: [] the parallel - 1, data: 10 14:15:29. 167 INFO com [window - thread - 1]. The example. The demo. SchedulerTest - subscribe thread: window - thread - [1], the data: 20 14:15:29. 370 INFO com [window - thread - 1]. The example. The demo. SchedulerTest - subscribe thread: window - thread - [1], the data: 30 14:15:29. 573 [parallel-4] INFO com.example.demo.SchedulerTest - subscribe thread:[parallel-4],data :40Copy the code
Note that the delayElements method creates a parallel thread timeout() for subscriber by default, and the skip() and other methods also create threads by default
scheduleGroup
Schedulers. NewSingle is used previously in publishOn and subscribeOn. Groups composed of multiple threads can also be used, for example
Scheduler parallelGroup = Schedulers.newParallel("parallel-group"And 8);Copy the code
You can also use the elastic type, which is more suitable for IO type operations
/**
* {@link Scheduler} that dynamically creates ExecutorService-based Workers and caches
* the thread pools, reusing them once the Workers have been shut down.
* <p>
* The maximum number of created thread pools is unbounded.
* <p>
* The default time-to-live for unused thread pools is 60 seconds, use the appropriate
* factory to push a different value.
* <p>
* This scheduler is not restartable.
*
* @param name Thread prefix
*
* @return a new {@link Scheduler} that hosts a fixed pool of single-threaded
* ExecutorService-based workers and is suited for parallel work
*/
public static Scheduler newElastic(String name) {
return newElastic(name, ElasticScheduler.DEFAULT_TTL_SECONDS);
}
Copy the code
The instance
@Test
public void testElasticGroup() throws InterruptedException {
Scheduler elastic = Schedulers.newElastic("elastic-group");
Flux.defer(() -> {
LOGGER.info("defer thread:[{}]",Thread.currentThread().getName());
returnFlux. The range (1, 4); }) .filter(e -> { LOGGER.info("filter thread:[{}]",Thread.currentThread().getName());
return e % 2 == 0;
})
.publishOn(elastic)
.map(e -> {
LOGGER.info("map thread:[{}]",Thread.currentThread().getName());
return e * 10;
})
.subscribeOn(elastic)
.subscribe(e -> {
LOGGER.info("subscribe thread:[{}],data :{}",Thread.currentThread().getName(),e);
});
Thread.sleep(10*1000);
}
Copy the code
The output
13:58:37. 356. [the main] the DEBUG reactor. The util. Loggers$LoggerFactory- Using Slf4j logging framework 13:58:37. 514 [elastic - group - 2] INFO com. Example. Demo. SchedulerTest - defer Thread: [elastic - group - 2] 13:58:37. [elastic - group - 2] 520 INFO com. Example. Demo. SchedulerTest - filter Thread: [elastic - group - 2] 13:58:37. [elastic - group - 2] 520 INFO com. Example. Demo. SchedulerTest - filter Thread: [elastic - group - 2] 13:58:37. [elastic - group - 2] 520 INFO com. Example. Demo. SchedulerTest - filter Thread: [elastic - group - 2] 13:58:37. [elastic - group - 2] 520 INFO com. Example. Demo. SchedulerTest - filter Thread: [elastic - group - 2] 13:58:37, 520 [elastic - group - 3] INFO com. Example. Demo. SchedulerTest - map Thread: [elastic - group - 3] 13:58:37, 520 [elastic - group - 3] INFO com. Example. Demo. SchedulerTest - subscribe Thread: [elastic - group - 3], data: 20 13:58:37, 521 [elastic - group - 3] INFO com. Example. Demo. SchedulerTest - map Thread: [elastic - group - 3] 13:58:37, 521 [elastic - group - 3] INFO com. Example. Demo. SchedulerTest - subscribe thread:[elastic-group-3],data :40Copy the code
summary
- Naming this publishOn and subscribeOn method name is a little more obscure, and a little more straightforward equivalent to subscriberThreadPools and publisherThreadPools.
- PublishOn and Operations locations
Filter or map after publishOn will use the publishOn configuration thread; Previously, you used the main thread or the subscribeOn configured thread
- subscribeOn
If publishOn is not configured and subscribeOn is configured only, all are applied
- Method built-in threads delayElements(),timeout(),skip() built-in use of additional threads
doc
- schedulers