While reactive programming has many benefits for improving performance, it also comes with some drawbacks, such as increased code complexity and high API intrusion (the equivalent of relying on a JDK). I personally believe that reactive programming is not suitable for business development, especially the development of complex business systems, which may be the reason why reactive programming is still lukewarm from its launch to now. Of course, this is not to persuade people to go from starting to giving up. Reactive programming is suitable for middleware with high performance requirements, or for low-level systems that are disconnected from business, such as gateways and message push services.
Reactive Streams
specificationReactor
How to do thatReactive Streams
Specification of the- I’m going to go through reactive programming
Context
The mystery of realization - The use of delegate patterns and
BaseSubscriber
- for
spring-data-r2dbc
Realize dynamic switching of multiple data sources
The Reactive Programming Reactor fully implements the Reactive Streams specification, Reactive Streams defines the Reactive programming specification, and if you go to Github and check it out, you’ll only see these four interfaces: Publisher, Subscriber, Subscription, Processor.
Before we look at these interfaces, we need to understand what reactive programming is.
The viewpoint of this article is only understood from the author’s personal point of view, and the correctness is related to the author’s level. Readers are welcome to leave comments if they have doubts in the process of reading this article.
In the past, blocking programming used to block I/O operations such as remote calls, which blocked the current thread to wait for the response from the interface, and then consumed the response after receiving it. The thread will remain idle during the waiting process, but in reactive programming, the current thread will do something else until it receives the response and then publishes it to subscribers to continue consuming the response. Of course, making a network request is outside the Reactor’s responsibility.
Reactive programming, in which publishers consume data by publishing it to subscribers.
Reactive flow refers to a raw data that has undergone multiple operations or transformations and is eventually consumed by subscribers. Each operation or transformation is also a publish subscription of data, and these publish subscriptions are sequentially combined to form a reactive flow.
Reactive Streams
specification
Publisher
: publisher;Subscriber
: subscriber;Subscription
: subscription, used to connect publishers and subscribersProcessor
: processor, both subscriber and publisher;
Because these interfaces are only specifications defined by Reactive Streams, detailed implementation needs to be understood in conjunction with the Implementation library of the Reactive Streams specification, so we will briefly familiarize ourselves with these interfaces and then describe how the Reactor implements them.
Publisher (Publisher)
public interface Publisher<T> {
void subscribe(Subscriber<? super T> s);
}
Copy the code
subscribe
Subscriber A subscriber;
If you’ve worked with WebFlux or a Reactor library, you’ll be familiar with the SUBSCRIBE method. Although you don’t know how it works, you know that only Mono/Flux’s SUBSCRIBE method triggers the entire stream execution.
Even if we haven’t been exposed to WebFlux or Reactor, we’ve probably all been exposed to streams provided by Java8. A Java8 Stream will only trigger the execution of the Stream if it encounters a termination operation, and reactive programming Publisher’s subscribe method is the equivalent of a termination operation for a Java8 Stream.
Until the Publisher#subscribe method is called, everything is just an execution plan that we define, and the execution plan dictates the execution of the entire stream.
Subscriber (Subscriber)
public interface Subscriber<T> {
void onSubscribe(Subscription s);
void onNext(T t);
void onError(Throwable t);
void onComplete(a);
}
Copy the code
The Reactive Streams specification defines four types of events that subscribers can subscribe to:
onSubscribe
: often byPublisher
Called by the publisher to which this subscriber subscribedPublisher#subscribe
Method when called;onNext
: often bySubscription
The call,Subscription#request
Called when data is requested;onError
: often byonNext
Method call, whenonNext
Called when a data consumption exception is caught;onComplete
: often bySubscription
Called when all data has been consumed normally and no errors have caused the subscription to terminate;
Subscription
public interface Subscription {
void request(long n);
void cancel(a);
}
Copy the code
Subscription, which is like a scenario class.
request
: The subscriber calls this method to request a specified amount of data, and calls the subscriber’s when the data is requestedonNext
Method to pass data to the subscriber, usually inSubscriber
theonSubscribe
Method is called in;cancel
: This method is usually called by the subscriber to unsubscriberequest
That no longer generates data and no longer triggers subscribersonNext
;
How does Reactor implement the Reactive Streams specification
A simple Mono use example is shown below.
public class MonoStu{
public static void main(String[] args){
/ / (1)
Mono.just(1)
/ / (2).subscribe(System.out::println); }}Copy the code
We will take a step-by-step look at this example to see how Reactor implements the Reactive Streams specification.
Reading this article does not require readers to browse the source code, of course, if you can combine the source code together to see the effect is better. Mono source code in the reactor.core. Publisher package of the reactor-core library.
public abstract class Mono<T>
implements Publisher<T> {}Copy the code
Mono is an abstract class that implements the Publisher interface of the Reactive Streams specification and extends the Publisher’s operations to provide streaming programming (Reactive flow).
Let’s start with the Mono#just static method:
public abstract class Mono<T> implements Publisher<T> {
public static <T> Mono<T> just(T data) {
return onAssembly(newMonoJust<>(data)); }}Copy the code
Reading the source code for the first time and not knowing the Reactor base, I do not recommend that you bother with the onAssembly method. It is also a learning method. Learn the backbone first and then worry about the details, so we will ignore the onAssembly method.
Ignore the Mono#just static method after onAssembly:
public abstract class Mono<T> implements Publisher<T> {
public static <T> Mono<T> just(T data) {
return newMonoJust<>(data); }}Copy the code
The just method returns a MonoJust object, which inherits from Mono. Similar to the Builder pattern we used, this is returned every time a method is called until the build method is called. Only Reactor’s Mono#just returns a new Mono object.
In keeping with the principle of reading the source code first to master the trunk, we have removed the other interfaces implemented by MonoJust and are concerned only with the methods it inherits from the Mono implementation. The simplified source code for MonoJust is as follows.
final class MonoJust<T> extends Mono<T> {
final T value;
MonoJust(T value) {
this.value = value;
}
// Focus your attention here
@Override
public void subscribe(CoreSubscriber<? super T> actual) { actual.onSubscribe(Operators.scalarSubscription(actual, value)); }}Copy the code
Since Mono is also a Publisher and implements the Publisher interface, we focus on the Subscribe method implemented by MonoJust.
MonoJust#subscribe is a CoreSubscriber input, which indicates that Mono has implemented the subscribe method of Publisher interface.
public abstract class Mono<T> implements Publisher<T> {
@Override
public final void subscribe(Subscriber<? super T> actual) {
// Originally onLastAssembly(this),
// We ignore onLastAssembly and use this
this.subscribe(Operators.toCoreSubscriber(actual));
}
// Subclass MonoJust implementation
public abstract void subscribe(CoreSubscriber<? super T> actual);
}
Copy the code
Now we just know that CoreSubscriber should be a subclass of Subscriber, and that’s enough for now.
Subscribe (system.out ::println) and then go back to the MonoJust#subscribe method.
Mono provides multiple overloading of subscribe methods. No matter which overloaded method we use, we end up calling Mono’s Subscribe method from the Publisher interface, which in turn calls the SUBSCRIBE method from the Mono subclass.
In this case, we call SUBSCRIBE and pass in a lambda, corresponding to the Accept method that implements the Consumer interface, which is ultimately wrapped as LambdaMonoSubscriber, as shown below.
public abstract class Mono<T> implements Publisher<T> {
public final Disposable subscribe(Consumer<? super T> consumer) {
/ / create LambdaMonoSubscriber
return subscribeWith(new LambdaMonoSubscriber<>(consumer, null.null.null)); }}Copy the code
Focus on the Consumer, new LambdaMonoSubscriber() and subscribeWith methods and don’t get distracted because the logic is a bit convoluted and you won’t understand it.
The subscribeWith method source code is as follows:
public abstract class Mono<T> implements Publisher<T> {
public final <E extends Subscriber<? super T>> E subscribeWith(E subscriber) {
Subscribe Mono implements the Publisher interface
subscribe(subscriber);
// To get started, don't worry about the return value of Disposable, which is used to unsubscribe
// return subscriber;}}Copy the code
SubscribeWith calls Mono to subscribe the subscribe method of the Publisher interface, so LambdaMonoSubscriber must be a Subscriber.
At this point, we should be concerned about how the publisher Mono passes data to the LambdaMonoSubscriber, for which we need to go back to the MonoJust#subscribe method.
final class MonoJust<T> extends Mono<T> {
final T value;
MonoJust(T value) {
this.value = value;
}
// Actual is LambdaMonoSubscriber
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
/ / create a SubscriptionSubscription subscription = Operators.scalarSubscription(actual, value); actual.onSubscribe(subscription); }}Copy the code
Here the subscribe parameter actual is the LambdaMonoSubscriber object, and the publisher MonoJust directly calls the subscriber’s onSubscribe method in the SUBSCRIBE method.
In the Reactive Streams specification, the Subscriber onSubscribe method requires a Subscription, So the MonoJust#subscribe method needs to wrap the real Subscriber LambdaMonoSubscriber and data value into a Subscription, which is also a Subscriber.
LambdaMonoSubscriber calls the Subscription request for data in the onSubscribe method.
The source code of LambdaMonoSubscriber is as follows. We ignore the empty parameter passed in by the LambdaMonoSubscriber constructor to simplify the code of LambdaMonoSubscriber and make it easy to read.
final class LambdaMonoSubscriber<T> implements InnerConsumer<T>, Disposable {
// Consumer is passed by the case: system.out ::println
final Consumer<? super T> consumer;
// Don't worry about why volatile now
volatile Subscription subscription;
LambdaMonoSubscriber(@Nullable Consumer<? super T> consumer) {
this.consumer = consumer;
}
/ / s = Operators of MonoJust scalarSubscription (actual value) return values
@Override
public final void onSubscribe(Subscription s) {
this.subscription = s;
// Request datas.request(Long.MAX_VALUE); }}Copy the code
Since we are working with Mono, the onSubscribe method requests all data s.request(long.max_value).
Before the analysis of this method, need to know Operators. ScalarSubscription returned by the Subscription is a scalarSubscription (synchronous Subscription implementation).
static final class ScalarSubscription<T> {
// here is LambdaMonoSubscriber
final CoreSubscriber<? super T> actual;
/ / data
final T value;
ScalarSubscription(CoreSubscriber<? super T> actual, T value) {
this.value = Objects.requireNonNull(value, "value");
this.actual = Objects.requireNonNull(actual, "actual");
}
@Override
public void request(long n) {
Subscriber<? superT> a = actual; a.onNext(value); a.onComplete(); }}Copy the code
For simplicity, I’ve stripped the ScalarSubscription code down a lot to make it easier to understand.
Both actual and data value are passed in the constructor. In this case, actual is LambdaMonoSubscriber, and value is 1 passed in by the Mono#just call.
Scalarsubscript # Request is called when LambdaMonoSubscriber#onSubscribe is called, In the request method, the onNext method of actual (LambdaMonoSubscriber) is directly called to pass the data value, and after the execution of onNext method, The onComplete method of actual (LambdaMonoSubscriber) is called.
Here the actual subscriber is LambdaMonoSubscriber, and the source code of LambdaMonoSubscriber#onNext method after pruning by the author is as follows:
final class LambdaMonoSubscriber<T> implements InnerConsumer<T>, Disposable {
@Override
public final void onNext(T x) {
if(consumer ! =null) {
try {
Lambda ==> system. out::println
consumer.accept(x);
}catch(Throwable t) { Operators.onErrorDropped(t, Context.empty()); }}}}Copy the code
The consumer called in onNext is the lambda expression we passed, which prints out the subscribed data.
This case study does not see an asynchronous implementation because we looked at the simplest scenarios that do not require asynchrony.
The execution process of this case is summarized as follows:
- 1, call
Mono#just
To create aMonoJust
Publisher, and the parameter is passedvalue
Will be the data that the publisher needs to publish; - 2, call
MonoJust#subscribe(Consumber lambda)
Pass a consumer’s consumption data, and the consumer is packaged as a subscriberLambdaMonoSubscriber
; - 3,
MonoJust#subscribe(CoreSubscriber)
Called in this methodOperators.scalarSubscription(actual, value)
createScalarSubscriptin
.
And the onSubscribe method of LambdaMonoSubscriber is called;
- 4,
LambdaMonoSubscriber#onSubscribe
Called in this methodScalarSubscription#request
Request data; - 5,
ScalarSubscription#request
Call the real subscriberLambdaMonoSubscriber
theonNext
Method and pass data inonNext
Called after the method completes executiononComplete
Methods; - 6,
LambdaMonoSubscriber#onNext
In the call caselambda
Expressions consume data.
Now let’s enrich the case:
public class MonoStu{
public static void main(String[] args){
/ / (1)
Mono.just(1)
/ / (2)
.map(String::valueOf)
/ / (3).subscribe(System.out::println); }}Copy the code
We added the Map (String::valueOf) operation, which plans to convert the data to a String when subscribing to the data passed by MonoJust, and then pass the converted data to the real subscriber subscription.
We already know that Mono. Just returns a MonoJust, so what does map return?
Mono#map source code:
public abstract class Mono<T> implements Publisher<T> {
public final <R> Mono<R> map(Function<? super T, ? extends R> mapper) {
return new MonoMap<>(this, mapper); }}Copy the code
As you can see, map returns a MonoMap:
final class MonoMap<T.R> extends MonoOperator<T.R> {
final Function<? super T, ? extends R> mapper;
MonoMap(Mono<? extends T> source, Function<? super T, ? extends R> mapper) {
super(source);
this.mapper = mapper;
}
@Override
public void subscribe(CoreSubscriber<? super R> actual) {
source.subscribe(newFluxMap.MapSubscriber<>(actual, mapper)); }}Copy the code
There is a new abstract class, MonoOperator, which we will use a lot in the future, such as R2DBC dynamic data source switching and Sentinel’s support for WebFlux.
MonoOperator converts a Mono(source) to a new Mono(in this case MonoMap) and then calls the SOURCE subscribe method when MonoMap’s subscribe method is called, thus concatenating the two publisher Monos.
Since the logic is quite round, let’s sort out the logic first:
- 1. Sequential operation
First, we create MonoJust and then MonoMap, so we call MonoMap’s subscribe method.
- Subscribe in reverse order
After (1), MonoMap’s subscribe method is executed, and MonoMap calls MonoJust’s subscribe.
/ / source is MonoJust
source.subscribe(new FluxMap.MapSubscriber<>(actual, mapper));
Copy the code
- 3. Consume data sequentially
MonoJust’s onSubscribe method is called after (2), so MonoMap’s onSubscribe method is called after MonoMap’s onNext method.
So, in this case, the argument passed by MonoMap’s SUBSCRIBE method is the real subscriber system.out ::println, and the source is MonoJust. MonoJust’s SUBSCRIBE method is first called when monomp #subscribe is called and the true subscriber is encapsulated as fluxmap. MapSubscriber is passed to MonoJust. Make MonoJust think fluxmap. MapSubscriber is the true subscriber, and when the onSubscribe method of Fluxmap. MapSubscriber is called, it calls the onSubscribe method of the true subscriber. It’s actually a commissioned design pattern.
At this point, we are done with the more complex examples. In fact, many operations are implemented through MonoOperator, which is why Context passing is possible, so we will look at the implementation of the Context.
Demystify the Context implementation of reactive programming
As summarized in the previous section, multiple operations (publish-subscribe) can be combined into a stream to perform the following operations: sequential operation, reverse order subscription, sequential consumption of data. Imagine passing a Context in a stream.
- Reverse order subscription: assume somewhere in the middle of the stream
Mono
createContext
bysubscribe
The method is passed upContext
; - Sequential consumption: in creation
Context
theMono
Before theMono
You can use thisContext
And hereMono
After theMono
You can’t get thatContext
;
Let’s draw a picture to understand:
A simple example of using Context:
public class ContextUseMain{
private static void testMono(a) {
// MonoMap
Mono<Integer> mono = Mono.just(1)
.subscriberContext(context -> {
// Get XXXX
System.out.println(context.get("xxxx").toString());
return context;
})
.map(x -> x * x);
// MonoSubscriberContext
mono = mono.subscriberContext(context -> context.put("xxxx", System.currentTimeMillis()));
// MonoMap
mono = mono.map(x -> (x + 1) * 2)
.subscriberContext(context -> {
// a null pointer is reported
System.out.println(context.get("xxxx").toString());
return context;
});
// Start subscribingmono.subscribe(System.out::println); }}Copy the code
Mono. SubscriberContext () : mono. SubscriberContext () : mono.
public abstract class Mono<T> implements Publisher<T> {
public final Mono<T> subscriberContext(Function<Context, Context> doOnContext) {
return new MonoSubscriberContext<>(this, doOnContext); }}Copy the code
The MonoSubscriberContext class inherits MonoOperator from the following source code:
final class MonoSubscriberContext<T> extends MonoOperator<T.T> implements Fuseable {
final Function<Context, Context> doOnContext;
MonoSubscriberContext(Mono<? extends T> source,
Function<Context, Context> doOnContext) {
super(source);
this.doOnContext = Objects.requireNonNull(doOnContext, "doOnContext");
}
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
// (1) Get the previous subscriber's Context
Context c = actual.currentContext();
try {
// (2) return a new Context by putting key-value to Context
c = doOnContext.apply(c);
}
catch (Throwable t) {
Operators.error(actual, Operators.onOperatorError(t, actual.currentContext()));
return;
}
// (3) ContextStartSubscriber Subscription
Subscription subscription = new FluxContextStart.ContextStartSubscriber<>(actual, c);
// Call the previous Mono subscribe methodsource.subscribe(subscription); }}Copy the code
- (1) : call subscriber’s
currentContext
Get the subscriber’sContext
Is returned if it does not existContext.empty()
A: One is emptyContext
);
CoreSubscriber provides the API to get the Context:
public interface CoreSubscriber<T> extends Subscriber<T> {
default Context currentContext(a){
returnContext.empty(); }}Copy the code
-
(2) call Function to get a new Context. If you put a key-value into the currentContext, the new Context will be created.
-
(3) : FluxContextStart ContextStartSubscriber is Subscription and Subscriber. Passed in the method of constructing the Context, so, if in FluxContextStart ContextStartSubscriber rewrite currentContext method, can get to the Context;
FluxContextStart. ContextStartSubscriber source (cut) as follows:
static final class ContextStartSubscriber<T> implements ConditionalSubscriber<T>, InnerOperator<T.T>, QueueSubscription<T> {
final CoreSubscriber<? super T> actual;
final Context context;
ContextStartSubscriber(CoreSubscriber<? super T> actual, Context context) {
this.actual = actual;
this.context = context;
}
@Override
public Context currentContext(a) {
return this.context; }}Copy the code
Context.empty() creates Context0:
public interface Context {
static Context empty(a) {
returnContext0.INSTANCE; }}Copy the code
Context0’s PUT method:
final class Context0 implements Context {
@Override
public Context put(Object key, Object value) {
return newContext1(key, value); }}Copy the code
As you can see, Context0#put creates Context1.
Context1:
class Context1 implements Context.Map.Entry<Object.Object> {
final Object key;
final Object value;
Context1(Object key, Object value) {
this.key = key;
this.value = value; }}Copy the code
If I continue to call Context1#put, I’m creating Context2.
Context2:
final class Context2 implements Context {
final Object key1;
final Object value1;
final Object key2;
final Object value2;
Context2(Object key1, Object value1, Object key2, Object value2) {
this.key1 = key1;
this.value1 = value1;
this.key2 = key2;
this.value2 = value2; }}Copy the code
Calling Context2#put creates Context3, and so on.
Each put returns a new Context. The purpose is to avoid multi-threaded locking and to achieve data isolation: subsequent operations (Mono) cannot retrieve data from previous operations (Mono) put.
The use of delegate patterns andBaseSubscriber
Before introducing BaseSubscriber, let’s learn about a new API: Transform. Mono/Flux transform method allows a Mono/Flux into a MonoOperator/FluxOperator, entrust to subscribe to the MonoOperator/FluxOperator;
Here’s a simple example:
public class BaseSubscriberUseMain{
public static void main(String[] args){ Mono<? > mono = createMono();// transform transforms the original mono into the new monomono = mono.transform((Function<Mono<? >, Publisher<? >>) m ->new MonoOperator(m) {
@Override
public void subscribe(CoreSubscriber actual) { source.subscribe(actual); }}); mono.subscribe(); }}Copy the code
This allows us to replace the subscriber passed by the MonoOperator subscribe method argument with the subscriber we implement as follows:
public class BaseSubscriberUseMain{
public static void main(String[] args){ Mono<? > mono = createMono();// transform transforms the original mono into the new monomono = mono.transform((Function<Mono<? >, Publisher<? >>) m ->new MonoOperator(m) {
@Override
public void subscribe(CoreSubscriber actual) {
source.subscribe(newActualSubscriberDelegater(actual)); }}); mono.subscribe(); }}Copy the code
ActualSubscriberDelegater inheritance BaseSubscriber, realize the hook method:
public class ActualSubscriberDelegater<T> extends BaseSubscriber<T> {
/**
* 真实的订阅者
*/
private CoreSubscriber<? super T> actual;
public ActualSubscriberDelegater(CoreSubscriber<? super T> actual) {
super(a);this.actual = actual;
}
@Override
protected void hookOnSubscribe(Subscription subscription) {
actual.onSubscribe(subscription);
}
@Override
protected void hookOnNext(T value) {
// Get the subscriber's Context
Long time = (Long) actual.currentContext().get("xxxx");
actual.onNext(value);
}
@Override
protected void hookOnComplete(a) {
actual.onComplete();
}
@Override
protected void hookOnError(Throwable throwable) { actual.onError(throwable); }}Copy the code
Look ActualSubscriberDelegater agent real subscriber all behavior, but this is not the proxy pattern, but the delegate pattern.
hookOnSubscribe
: The method is in the parent classonSubscribe
Called when a method is called;hookOnNext
: The method is in the parent classonNext
Called when a method is called;hookOnComplete
: The method is in the parent classonComplete
Called when a method is called;hookOnError
: The method is in the parent classonError
Called when a method is called;
As a result, ActualSubscriberDelegater can be used to accomplish debugging, diary operations such as printing.
Alibaba Sentinel also combines transform API with BaseSubscriber to use adaptive reaction Reactor database. We can also use this to calculate the execution time of an interface.
forspring-data-r2dbc
Realize dynamic switching of multiple data sources
Hotkit-r2dbc is the author’s personal open source project, encapsulating the spring-data-R2DBC multi-data source dynamic switch implementation.
Making a link
:https://github.com/wujiuye/hotkit-r2dbc
Hotkit-r2dbc dynamic data source switching for hotKit-R2DBC is very simple, you don’t need to use hotkit-R2DBC, you can also implement dynamic data source switching for Spring-data-R2DBC.
Spring-data-r2dbc provides connection factory routing classes: AbstractRoutingConnectionFactory, we only need to inherit AbstractRoutingConnectionFactory and realize its determineCurrentLookupKey method, Return the correct data source key when the method is called.
/** * ConnectionFactory Route **@author wujiuye 2020/11/03
*/
public class HotkitR2dbcRoutingConnectionFactory extends AbstractRoutingConnectionFactory {
private final static String DB_KEY = "HOTKIT-R2DBC-DB";
public HotkitR2dbcRoutingConnectionFactory(Map<String, ConnectionFactory> connectionFactoryMap) {
/ /...
setTargetConnectionFactories(connectionFactoryMap);
//....
}
@Override
protected Mono<Object> determineCurrentLookupKey(a) {
return Mono.subscriberContext().handle((context, sink) -> {
if(context.hasKey(DB_KEY)) { sink.next(context.get(DB_KEY)); }}); }}Copy the code
Of course, this doesn’t work, you need to provide a method for the aspect to get the Context and write to the data source, so the complete connection factory router would look like this:
/** * ConnectionFactory Route **@author wujiuye 2020/11/03
*/
public class HotkitR2dbcRoutingConnectionFactory extends AbstractRoutingConnectionFactory {
private final static String DB_KEY = "HOTKIT-R2DBC-DB";
public HotkitR2dbcRoutingConnectionFactory(Map<String, ConnectionFactory> connectionFactoryMap) {
/ /...
setTargetConnectionFactories(connectionFactoryMap);
//....
}
// Write to the data source
public static <T> Mono<T> putDataSource(Mono<T> mono, String dataSource) {
return mono.subscriberContext(context -> context.put(DB_KEY, dataSource));
}
// Write to the data source
public static <T> Flux<T> putDataSource(Flux<T> flux, String dataSource) {
return flux.subscriberContext(context -> context.put(DB_KEY, dataSource));
}
@Override
protected Mono<Object> determineCurrentLookupKey(a) {
return Mono.subscriberContext().handle((context, sink) -> {
if(context.hasKey(DB_KEY)) { sink.next(context.get(DB_KEY)); }}); }}Copy the code
The implementation of the section class is as follows:
@Component
@Aspect
@Order(Ordered.HIGHEST_PRECEDENCE)
public class DynamicDataSourceAop {
@Pointcut(value = "@annotation(com.wujiuye.hotkit.r2dbc.annotation.R2dbcDataBase)")
public void point(a) {}@Around(value = "point()")
public Object aroudAop(ProceedingJoinPoint pjp) throws Throwable {
MethodSignature signature = (MethodSignature) pjp.getSignature();
Method method = signature.getMethod();
R2dbcDataBase dataSource = method.getAnnotation(R2dbcDataBase.class);
// The method returns a value of type Mono
if (method.getReturnType() == Mono.class) {
returnHotkitR2dbcRoutingConnectionFactory.putDataSource((Mono<? >) pjp.proceed(), dataSource.value()); }// The method returns a value of type Flux
else {
returnHotkitR2dbcRoutingConnectionFactory.putDataSource((Flux<? >) pjp.proceed(), dataSource.value()); }}}Copy the code
After the target method is executed and returns Mono or Flux, the section calls the subscriberContext method that returns Mono or Flux to convert the returned value Mono to a MonoSubscriberContext. Or turn the return value Flux into a FluxContextStart.
Using Mono as an example, write the data source to the Context in the callback Function of the MonoSubscriberContext#subscribe method. When Mono is finally subscribed, The MonoSubscriberContext passes the Context to the subscriber.
Pay attention to the order of the data source and transaction facets to avoid invalid transactions.
Use a test case to illustrate when to switch data sources:
public class RoutingTest extends SupporSpringBootTest {
@Resource
private DatabaseClient client;
@Test
public void test(a) throws InterruptedException {
Mono<Void> operation = client.execute("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
.bind("id"."joe")
.bind("name"."Joe")
.bind("age".34)
.fetch()
.rowsUpdated()
.then();
// Switch data sourcesMono<Void> dbOperation = HotkitR2dbcRoutingConnectionFactory.putDataSource(operation,MasterSlaveMode.Slave); dbOperation.subscribe(); TimeUnit.SECONDS.sleep(Integer.MAX_VALUE); }}Copy the code
The publisher operation in real before executing Sql must be from the connection factory router connection factory for publishers, namely the connection factory router determineCurrentLookupKey is invoked.
So, after operation turns to dbOperation, subscribing to dbOperation writes the Context to the subscribers of dbOperation and puts the data source, the Context is passed in the stream, Eventually operation of subscribers can call determineCurrentLookupKey method from the subscriber’s Context access to the data source, this is the implementation principle of multiple source switching.
End
This article covers only Mono’s synchronous subscription process, because synchronous subscriptions are easier to understand and Mono is easier to introduce than Flux.
I recommend that you read the source code for Flux or the asynchronous subscription code to understand the Reactor database.