sequence
This paper mainly studies variable transfer of REACTOR asynchronous thread
The problem of threadlocal
Using ThreadLocal to pass context variables is very convenient in the traditional request/reply synchronization mode, eliminating the need to add common variables to each method parameter, such as the current logged-in user. However, the business method may use async or be executed asynchronously in another thread pool, in which case threadLocal becomes ineffective.
In this case, the solution is to adopt propagation mode, that is, to propagate the variable between synchronous threads and asynchronous threads.
TaskDecorator
Spring, for example, provides TaskDecorators that you can implement to control which variables are propagated. Such as:
class MdcTaskDecorator implements TaskDecorator {
@Override
public Runnable decorate(Runnable runnable) {
// Right now: Web thread context !
// (Grab the current thread MDC data)
Map<String, String> contextMap = MDC.getCopyOfContextMap();
return () -> {
try {
// Right now: @Async thread context !
// (Restore the Web thread context's MDC data) MDC.setContextMap(contextMap); runnable.run(); } finally { MDC.clear(); }}; }}Copy the code
Here note the word “clear” in finally
Configure the taskDecorator
@EnableAsync
@Configuration
public class AsyncConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setTaskDecorator(new MdcTaskDecorator());
executor.initialize();
returnexecutor; }}Copy the code
See Spring 4.3: Using a TaskDecorator to Copy MDC data to @async Threads for a complete example
Reactor Context
Spring5 introduces webflux, which is based on a reactor. How does a reactor propagate context variables? Context objects are officially provided to replace threadLocal.
Its characteristics are as follows:
- Map-like KV operations, such as PUT (Object key, Object value),putAll(Context), hasKey(Object key)
- Immutable, which means that put does not overwrite the same key
- Provide getOrDefault, getOrEmpty methods
- Context is bound to each Subscriber on the scope chain
- Access by subscriberContext(Context)
- The Context is going to be bottom-up
The instance
Setting and reading
@Test
public void testSubscriberContext(){
String key = "message";
Mono<String> r = Mono.just("Hello")
.flatMap( s -> Mono.subscriberContext()
.map( ctx -> s + "" + ctx.get(key)))
.subscriberContext(ctx -> ctx.put(key, "World"));
StepVerifier.create(r)
.expectNext("Hello World")
.verifyComplete();
}
Copy the code
Here you set the message value to World from the bottom subscriberContext and then access it through the flatMap subscriberContext.
From the bottom up
@Test
public void testContextSequence(){
String key = "message";
Mono<String> r = Mono.just("Hello"CTX -> CTX. Put (key,"World"))
.flatMap( s -> Mono.subscriberContext()
.map( ctx -> s + "" + ctx.getOrDefault(key, "Stranger")));
StepVerifier.create(r)
.expectNext("Hello Stranger")
.verifyComplete();
}
Copy the code
Because the subscriberContext setting for this example is too high, the mono.subscriberContext () in the flatMap is not applied.
immutable
@Test
public void testContextImmutable(){
String key = "message";
Mono<String> r = Mono.subscriberContext()
.map( ctx -> ctx.put(key, "Hello"FlatMap (CTX -> mono.subscriberContext ()).map(CTX -> ctx.getordefault (key,"Default"));
StepVerifier.create(r)
.expectNext("Default")
.verifyComplete();
}
Copy the code
SubscriberContext always returns a new one
Multiple sequential SubscriberContexts
@Test
public void testReadOrder(){
String key = "message";
Mono<String> r = Mono.just("Hello")
.flatMap( s -> Mono.subscriberContext()
.map( ctx -> s + "" + ctx.get(key)))
.subscriberContext(ctx -> ctx.put(key, "Reactor"))
.subscriberContext(ctx -> ctx.put(key, "World"));
StepVerifier.create(r)
.expectNext("Hello Reactor")
.verifyComplete();
}
Copy the code
Operator will only read the context nearest to it
Between flatMap subscriberContext
@Test
public void testContextBetweenFlatMap(){
String key = "message";
Mono<String> r = Mono.just("Hello")
.flatMap( s -> Mono.subscriberContext()
.map( ctx -> s + "" + ctx.get(key)))
.subscriberContext(ctx -> ctx.put(key, "Reactor"))
.flatMap( s -> Mono.subscriberContext()
.map( ctx -> s + "" + ctx.get(key)))
.subscriberContext(ctx -> ctx.put(key, "World"));
StepVerifier.create(r)
.expectNext("Hello Reactor World")
.verifyComplete();
}
Copy the code
FlatMap reads the nearest context
The subscriberContext flatMap
@Test
public void testContextInFlatMap(){
String key = "message";
Mono<String> r =
Mono.just("Hello")
.flatMap( s -> Mono.subscriberContext()
.map( ctx -> s + "" + ctx.get(key))
)
.flatMap( s -> Mono.subscriberContext()
.map( ctx -> s + "" + ctx.get(key))
.subscriberContext(ctx -> ctx.put(key, "Reactor"))
)
.subscriberContext(ctx -> ctx.put(key, "World"));
StepVerifier.create(r)
.expectNext("Hello World Reactor")
.verifyComplete();
}
Copy the code
Here, the first flatMap cannot read the context inside the second flatMap
summary
Reactor implements a synchronization threadLocal function by providing a Context, which is very powerful and worth considering.
doc
- TaskDecorator
- Using a TaskDecorator to copy MDC data to @async Threads
- HOW TO PASS CONTEXT IN STANDARD WAY – WITHOUT THREADLOCAL
- Spring Security Context Propagation with @Async
- How do I access the RequestContextHolder in an async thread
- Context Aware Java Executor and Spring’s @Async
- 8.8.1. The Context API