While reactive programming has many benefits for improving performance, it also comes with some drawbacks, such as increased code complexity and high API intrusion (the equivalent of relying on a JDK). I personally believe that reactive programming is not suitable for business development, especially the development of complex business systems, which may be the reason why reactive programming is still lukewarm from its launch to now. Of course, this is not to persuade people to go from starting to giving up. Reactive programming is suitable for middleware with high performance requirements, or for low-level systems that are disconnected from business, such as gateways and message push services.

  • Reactive Streamsspecification
  • ReactorHow to do thatReactive StreamsSpecification of the
  • I’m going to go through reactive programmingContextThe mystery of realization
  • The use of delegate patterns andBaseSubscriber
  • forspring-data-r2dbcRealize dynamic switching of multiple data sources

The Reactive Programming Reactor fully implements the Reactive Streams specification, Reactive Streams defines the Reactive programming specification, and if you go to Github and check it out, you’ll only see these four interfaces: Publisher, Subscriber, Subscription, Processor.

Before we look at these interfaces, we need to understand what reactive programming is.

The viewpoint of this article is only understood from the author’s personal point of view, and the correctness is related to the author’s level. Readers are welcome to leave comments if they have doubts in the process of reading this article.

In the past, blocking programming used to block I/O operations such as remote calls, which blocked the current thread to wait for the response from the interface, and then consumed the response after receiving it. The thread will remain idle during the waiting process, but in reactive programming, the current thread will do something else until it receives the response and then publishes it to subscribers to continue consuming the response. Of course, making a network request is outside the Reactor’s responsibility.

Reactive programming, in which publishers consume data by publishing it to subscribers.

Reactive flow refers to a raw data that has undergone multiple operations or transformations and is eventually consumed by subscribers. Each operation or transformation is also a publish subscription of data, and these publish subscriptions are sequentially combined to form a reactive flow.

Reactive Streamsspecification

  • Publisher: publisher;
  • Subscriber: subscriber;
  • Subscription: subscription, used to connect publishers and subscribers
  • Processor: processor, both subscriber and publisher;

Because these interfaces are only specifications defined by Reactive Streams, detailed implementation needs to be understood in conjunction with the Implementation library of the Reactive Streams specification, so we will briefly familiarize ourselves with these interfaces and then describe how the Reactor implements them.

Publisher (Publisher)

public interface Publisher<T> {
    void subscribe(Subscriber<? super T> s);
}
Copy the code
  • subscribeSubscriber A subscriber;

If you’ve worked with WebFlux or a Reactor library, you’ll be familiar with the SUBSCRIBE method. Although you don’t know how it works, you know that only Mono/Flux’s SUBSCRIBE method triggers the entire stream execution.

Even if we haven’t been exposed to WebFlux or Reactor, we’ve probably all been exposed to streams provided by Java8. A Java8 Stream will only trigger the execution of the Stream if it encounters a termination operation, and reactive programming Publisher’s subscribe method is the equivalent of a termination operation for a Java8 Stream.

Until the Publisher#subscribe method is called, everything is just an execution plan that we define, and the execution plan dictates the execution of the entire stream.

Subscriber (Subscriber)

public interface Subscriber<T> {
    void onSubscribe(Subscription s);
    void onNext(T t);
    void onError(Throwable t);
    void onComplete(a);
}
Copy the code

The Reactive Streams specification defines four types of events that subscribers can subscribe to:

  • onSubscribe: often byPublisherCalled by the publisher to which this subscriber subscribedPublisher#subscribeMethod when called;
  • onNext: often bySubscriptionThe call,Subscription#requestCalled when data is requested;
  • onError: often byonNextMethod call, whenonNextCalled when a data consumption exception is caught;
  • onComplete: often bySubscriptionCalled when all data has been consumed normally and no errors have caused the subscription to terminate;

Subscription

public interface Subscription {
    void request(long n);
    void cancel(a);
}
Copy the code

Subscription, which is like a scenario class.

  • request: The subscriber calls this method to request a specified amount of data, and calls the subscriber’s when the data is requestedonNextMethod to pass data to the subscriber, usually inSubscribertheonSubscribeMethod is called in;
  • cancel: This method is usually called by the subscriber to unsubscriberequestThat no longer generates data and no longer triggers subscribersonNext;

How does Reactor implement the Reactive Streams specification

A simple Mono use example is shown below.

public class MonoStu{
    public static void main(String[] args){
        / / (1)
        Mono.just(1)
        / / (2).subscribe(System.out::println); }}Copy the code

We will take a step-by-step look at this example to see how Reactor implements the Reactive Streams specification.

Reading this article does not require readers to browse the source code, of course, if you can combine the source code together to see the effect is better. Mono source code in the reactor.core. Publisher package of the reactor-core library.

public abstract class Mono<T> 
       implements Publisher<T> {}Copy the code

Mono is an abstract class that implements the Publisher interface of the Reactive Streams specification and extends the Publisher’s operations to provide streaming programming (Reactive flow).

Let’s start with the Mono#just static method:

public abstract class Mono<T> implements Publisher<T> {
    public static <T> Mono<T> just(T data) {
		return onAssembly(newMonoJust<>(data)); }}Copy the code

Reading the source code for the first time and not knowing the Reactor base, I do not recommend that you bother with the onAssembly method. It is also a learning method. Learn the backbone first and then worry about the details, so we will ignore the onAssembly method.

Ignore the Mono#just static method after onAssembly:

public abstract class Mono<T> implements Publisher<T> {
    public static <T> Mono<T> just(T data) {
		return newMonoJust<>(data); }}Copy the code

The just method returns a MonoJust object, which inherits from Mono. Similar to the Builder pattern we used, this is returned every time a method is called until the build method is called. Only Reactor’s Mono#just returns a new Mono object.

In keeping with the principle of reading the source code first to master the trunk, we have removed the other interfaces implemented by MonoJust and are concerned only with the methods it inherits from the Mono implementation. The simplified source code for MonoJust is as follows.

final class MonoJust<T> extends Mono<T> {

	final T value;

	MonoJust(T value) {
		this.value = value;
	}
    // Focus your attention here
	@Override
	public void subscribe(CoreSubscriber<? super T> actual) { actual.onSubscribe(Operators.scalarSubscription(actual, value)); }}Copy the code

Since Mono is also a Publisher and implements the Publisher interface, we focus on the Subscribe method implemented by MonoJust.

MonoJust#subscribe is a CoreSubscriber input, which indicates that Mono has implemented the subscribe method of Publisher interface.

public abstract class Mono<T> implements Publisher<T> {
    @Override
	public final void subscribe(Subscriber<? super T> actual) {
        // Originally onLastAssembly(this),
        // We ignore onLastAssembly and use this
		this.subscribe(Operators.toCoreSubscriber(actual));
	}
    // Subclass MonoJust implementation
	public abstract void subscribe(CoreSubscriber<? super T> actual);
}
Copy the code

Now we just know that CoreSubscriber should be a subclass of Subscriber, and that’s enough for now.

Subscribe (system.out ::println) and then go back to the MonoJust#subscribe method.

Mono provides multiple overloading of subscribe methods. No matter which overloaded method we use, we end up calling Mono’s Subscribe method from the Publisher interface, which in turn calls the SUBSCRIBE method from the Mono subclass.

In this case, we call SUBSCRIBE and pass in a lambda, corresponding to the Accept method that implements the Consumer interface, which is ultimately wrapped as LambdaMonoSubscriber, as shown below.

public abstract class Mono<T> implements Publisher<T> {
    public final Disposable subscribe(Consumer<? super T> consumer) {
        / / create LambdaMonoSubscriber
		return subscribeWith(new LambdaMonoSubscriber<>(consumer, null.null.null)); }}Copy the code

Focus on the Consumer, new LambdaMonoSubscriber() and subscribeWith methods and don’t get distracted because the logic is a bit convoluted and you won’t understand it.

The subscribeWith method source code is as follows:

public abstract class Mono<T> implements Publisher<T> {
    public final <E extends Subscriber<? super T>> E subscribeWith(E subscriber) {
        Subscribe Mono implements the Publisher interface
		subscribe(subscriber);
        // To get started, don't worry about the return value of Disposable, which is used to unsubscribe
		// return subscriber;}}Copy the code

SubscribeWith calls Mono to subscribe the subscribe method of the Publisher interface, so LambdaMonoSubscriber must be a Subscriber.

At this point, we should be concerned about how the publisher Mono passes data to the LambdaMonoSubscriber, for which we need to go back to the MonoJust#subscribe method.

final class MonoJust<T> extends Mono<T> {
	final T value;
    
	MonoJust(T value) {
	    this.value = value;
	}
    
    // Actual is LambdaMonoSubscriber
	@Override
	public void subscribe(CoreSubscriber<? super T> actual) {
        / / create a SubscriptionSubscription subscription = Operators.scalarSubscription(actual, value); actual.onSubscribe(subscription); }}Copy the code

Here the subscribe parameter actual is the LambdaMonoSubscriber object, and the publisher MonoJust directly calls the subscriber’s onSubscribe method in the SUBSCRIBE method.

In the Reactive Streams specification, the Subscriber onSubscribe method requires a Subscription, So the MonoJust#subscribe method needs to wrap the real Subscriber LambdaMonoSubscriber and data value into a Subscription, which is also a Subscriber.

LambdaMonoSubscriber calls the Subscription request for data in the onSubscribe method.

The source code of LambdaMonoSubscriber is as follows. We ignore the empty parameter passed in by the LambdaMonoSubscriber constructor to simplify the code of LambdaMonoSubscriber and make it easy to read.

final class LambdaMonoSubscriber<T> implements InnerConsumer<T>, Disposable {
    
     // Consumer is passed by the case: system.out ::println
	final Consumer<? super T>  consumer;
    // Don't worry about why volatile now
	volatile Subscription subscription;

	LambdaMonoSubscriber(@Nullable Consumer<? super T> consumer) {
		this.consumer = consumer;
	}

    / / s = Operators of MonoJust scalarSubscription (actual value) return values
	@Override
	public final void onSubscribe(Subscription s) {
	    this.subscription = s;
        // Request datas.request(Long.MAX_VALUE); }}Copy the code

Since we are working with Mono, the onSubscribe method requests all data s.request(long.max_value).

Before the analysis of this method, need to know Operators. ScalarSubscription returned by the Subscription is a scalarSubscription (synchronous Subscription implementation).

static final class ScalarSubscription<T> {
 
        // here is LambdaMonoSubscriber
		final CoreSubscriber<? super T> actual;
        / / data
		final T value;
		
		ScalarSubscription(CoreSubscriber<? super T> actual, T value) {
			this.value = Objects.requireNonNull(value, "value");
			this.actual = Objects.requireNonNull(actual, "actual");
		}
		
		@Override
		public void request(long n) {
		    Subscriber<? superT> a = actual; a.onNext(value); a.onComplete(); }}Copy the code

For simplicity, I’ve stripped the ScalarSubscription code down a lot to make it easier to understand.

Both actual and data value are passed in the constructor. In this case, actual is LambdaMonoSubscriber, and value is 1 passed in by the Mono#just call.

Scalarsubscript # Request is called when LambdaMonoSubscriber#onSubscribe is called, In the request method, the onNext method of actual (LambdaMonoSubscriber) is directly called to pass the data value, and after the execution of onNext method, The onComplete method of actual (LambdaMonoSubscriber) is called.

Here the actual subscriber is LambdaMonoSubscriber, and the source code of LambdaMonoSubscriber#onNext method after pruning by the author is as follows:

final class LambdaMonoSubscriber<T> implements InnerConsumer<T>, Disposable {
    @Override
    public final void onNext(T x) {
        if(consumer ! =null) {
            try {
                Lambda ==> system. out::println
                consumer.accept(x);
            }catch(Throwable t) { Operators.onErrorDropped(t, Context.empty()); }}}}Copy the code

The consumer called in onNext is the lambda expression we passed, which prints out the subscribed data.

This case study does not see an asynchronous implementation because we looked at the simplest scenarios that do not require asynchrony.

The execution process of this case is summarized as follows:

  • 1, callMono#justTo create aMonoJustPublisher, and the parameter is passedvalueWill be the data that the publisher needs to publish;
  • 2, callMonoJust#subscribe(Consumber lambda)Pass a consumer’s consumption data, and the consumer is packaged as a subscriberLambdaMonoSubscriber;
  • 3,MonoJust#subscribe(CoreSubscriber)Called in this methodOperators.scalarSubscription(actual, value)createScalarSubscriptin.

And the onSubscribe method of LambdaMonoSubscriber is called;

  • 4,LambdaMonoSubscriber#onSubscribeCalled in this methodScalarSubscription#requestRequest data;
  • 5,ScalarSubscription#requestCall the real subscriberLambdaMonoSubscribertheonNextMethod and pass data inonNextCalled after the method completes executiononCompleteMethods;
  • 6,LambdaMonoSubscriber#onNextIn the call caselambdaExpressions consume data.

Now let’s enrich the case:

public class MonoStu{
    public static void main(String[] args){
        / / (1)
        Mono.just(1)
        / / (2)
            .map(String::valueOf)
        / / (3).subscribe(System.out::println); }}Copy the code

We added the Map (String::valueOf) operation, which plans to convert the data to a String when subscribing to the data passed by MonoJust, and then pass the converted data to the real subscriber subscription.

We already know that Mono. Just returns a MonoJust, so what does map return?

Mono#map source code:

public abstract class Mono<T> implements Publisher<T> {
    public final <R> Mono<R> map(Function<? super T, ? extends R> mapper) {
		return new MonoMap<>(this, mapper); }}Copy the code

As you can see, map returns a MonoMap:

final class MonoMap<T.R> extends MonoOperator<T.R> {

	final Function<? super T, ? extends R> mapper;
	
	MonoMap(Mono<? extends T> source, Function<? super T, ? extends R> mapper) {
		super(source);
		this.mapper = mapper;
	}

	@Override
	public void subscribe(CoreSubscriber<? super R> actual) {
		source.subscribe(newFluxMap.MapSubscriber<>(actual, mapper)); }}Copy the code

There is a new abstract class, MonoOperator, which we will use a lot in the future, such as R2DBC dynamic data source switching and Sentinel’s support for WebFlux.

MonoOperator converts a Mono(source) to a new Mono(in this case MonoMap) and then calls the SOURCE subscribe method when MonoMap’s subscribe method is called, thus concatenating the two publisher Monos.

Since the logic is quite round, let’s sort out the logic first:

  • 1. Sequential operation

First, we create MonoJust and then MonoMap, so we call MonoMap’s subscribe method.

  • Subscribe in reverse order

After (1), MonoMap’s subscribe method is executed, and MonoMap calls MonoJust’s subscribe.

/ / source is MonoJust
source.subscribe(new FluxMap.MapSubscriber<>(actual, mapper));
Copy the code
  • 3. Consume data sequentially

MonoJust’s onSubscribe method is called after (2), so MonoMap’s onSubscribe method is called after MonoMap’s onNext method.

So, in this case, the argument passed by MonoMap’s SUBSCRIBE method is the real subscriber system.out ::println, and the source is MonoJust. MonoJust’s SUBSCRIBE method is first called when monomp #subscribe is called and the true subscriber is encapsulated as fluxmap. MapSubscriber is passed to MonoJust. Make MonoJust think fluxmap. MapSubscriber is the true subscriber, and when the onSubscribe method of Fluxmap. MapSubscriber is called, it calls the onSubscribe method of the true subscriber. It’s actually a commissioned design pattern.

At this point, we are done with the more complex examples. In fact, many operations are implemented through MonoOperator, which is why Context passing is possible, so we will look at the implementation of the Context.

Demystify the Context implementation of reactive programming

As summarized in the previous section, multiple operations (publish-subscribe) can be combined into a stream to perform the following operations: sequential operation, reverse order subscription, sequential consumption of data. Imagine passing a Context in a stream.

  • Reverse order subscription: assume somewhere in the middle of the streamMonocreateContextbysubscribeThe method is passed upContext;
  • Sequential consumption: in creationContexttheMonoBefore theMonoYou can use thisContextAnd hereMonoAfter theMonoYou can’t get thatContext;

Let’s draw a picture to understand:

A simple example of using Context:

public class ContextUseMain{
  private static void testMono(a) {
        // MonoMap
        Mono<Integer> mono = Mono.just(1)
                .subscriberContext(context -> {
                    // Get XXXX
                    System.out.println(context.get("xxxx").toString());
                    return context;
                })
                .map(x -> x * x);
        // MonoSubscriberContext
        mono = mono.subscriberContext(context -> context.put("xxxx", System.currentTimeMillis()));
        // MonoMap
        mono = mono.map(x -> (x + 1) * 2)
                 .subscriberContext(context -> {
                     // a null pointer is reported
                    System.out.println(context.get("xxxx").toString());
                    return context;
                });
        // Start subscribingmono.subscribe(System.out::println); }}Copy the code

Mono. SubscriberContext () : mono. SubscriberContext () : mono.

public abstract class Mono<T> implements Publisher<T> {
    public final Mono<T> subscriberContext(Function<Context, Context> doOnContext) {
		return new MonoSubscriberContext<>(this, doOnContext); }}Copy the code

The MonoSubscriberContext class inherits MonoOperator from the following source code:

final class MonoSubscriberContext<T> extends MonoOperator<T.T> implements Fuseable {

	final Function<Context, Context> doOnContext;

	MonoSubscriberContext(Mono<? extends T> source,
			Function<Context, Context> doOnContext) {
		super(source);
		this.doOnContext = Objects.requireNonNull(doOnContext, "doOnContext");
	}

	@Override
	public void subscribe(CoreSubscriber<? super T> actual) {
		// (1) Get the previous subscriber's Context
        Context c = actual.currentContext();
		try {
            // (2) return a new Context by putting key-value to Context
			c = doOnContext.apply(c);
		}
		catch (Throwable t) {
			Operators.error(actual, Operators.onOperatorError(t, actual.currentContext()));
			return;
		}
        // (3) ContextStartSubscriber Subscription
        Subscription  subscription = new FluxContextStart.ContextStartSubscriber<>(actual, c);
        // Call the previous Mono subscribe methodsource.subscribe(subscription); }}Copy the code
  • (1) : call subscriber’scurrentContextGet the subscriber’sContextIs returned if it does not existContext.empty()A: One is emptyContext);

CoreSubscriber provides the API to get the Context:

public interface CoreSubscriber<T> extends Subscriber<T> {
	default Context currentContext(a){
		returnContext.empty(); }}Copy the code
  • (2) call Function to get a new Context. If you put a key-value into the currentContext, the new Context will be created.

  • (3) : FluxContextStart ContextStartSubscriber is Subscription and Subscriber. Passed in the method of constructing the Context, so, if in FluxContextStart ContextStartSubscriber rewrite currentContext method, can get to the Context;

FluxContextStart. ContextStartSubscriber source (cut) as follows:

static final class ContextStartSubscriber<T> implements ConditionalSubscriber<T>, InnerOperator<T.T>,  QueueSubscription<T> {

    final CoreSubscriber<? super T>        actual;
    final Context                          context;

    ContextStartSubscriber(CoreSubscriber<? super T> actual, Context context) {
        this.actual = actual;
        this.context = context;
    }

    @Override
	public Context currentContext(a) {
        return this.context; }}Copy the code

Context.empty() creates Context0:

public interface Context {
    static Context empty(a) {
		returnContext0.INSTANCE; }}Copy the code

Context0’s PUT method:

final class Context0 implements Context {
	@Override
	public Context put(Object key, Object value) {
		return newContext1(key, value); }}Copy the code

As you can see, Context0#put creates Context1.

Context1:

class Context1 implements Context.Map.Entry<Object.Object> {

	final Object key;
	final Object value;

	Context1(Object key, Object value) {
		this.key = key;
		this.value = value; }}Copy the code

If I continue to call Context1#put, I’m creating Context2.

Context2:

final class Context2 implements Context {

	final Object key1;
	final Object value1;
	final Object key2;
	final Object value2;
	Context2(Object key1, Object value1, Object key2, Object value2) {
		this.key1 = key1;
		this.value1 = value1;
		this.key2 = key2;
		this.value2 = value2; }}Copy the code

Calling Context2#put creates Context3, and so on.

Each put returns a new Context. The purpose is to avoid multi-threaded locking and to achieve data isolation: subsequent operations (Mono) cannot retrieve data from previous operations (Mono) put.

The use of delegate patterns andBaseSubscriber

Before introducing BaseSubscriber, let’s learn about a new API: Transform. Mono/Flux transform method allows a Mono/Flux into a MonoOperator/FluxOperator, entrust to subscribe to the MonoOperator/FluxOperator;

Here’s a simple example:

public class BaseSubscriberUseMain{
    public static void main(String[] args){ Mono<? > mono = createMono();// transform transforms the original mono into the new monomono = mono.transform((Function<Mono<? >, Publisher<? >>) m ->new MonoOperator(m) {
            @Override
            public void subscribe(CoreSubscriber actual) { source.subscribe(actual); }}); mono.subscribe(); }}Copy the code

This allows us to replace the subscriber passed by the MonoOperator subscribe method argument with the subscriber we implement as follows:

public class BaseSubscriberUseMain{
    public static void main(String[] args){ Mono<? > mono = createMono();// transform transforms the original mono into the new monomono = mono.transform((Function<Mono<? >, Publisher<? >>) m ->new MonoOperator(m) {
            @Override
            public void subscribe(CoreSubscriber actual) {
                source.subscribe(newActualSubscriberDelegater(actual)); }}); mono.subscribe(); }}Copy the code

ActualSubscriberDelegater inheritance BaseSubscriber, realize the hook method:

public class ActualSubscriberDelegater<T> extends BaseSubscriber<T> {

    /**
     * 真实的订阅者
     */
    private CoreSubscriber<? super T> actual;

    public ActualSubscriberDelegater(CoreSubscriber<? super T> actual) {
        super(a);this.actual = actual;
    }

    @Override
    protected void hookOnSubscribe(Subscription subscription) {
        actual.onSubscribe(subscription);
    }

    @Override
    protected void hookOnNext(T value) {
        // Get the subscriber's Context
        Long time = (Long) actual.currentContext().get("xxxx");
        actual.onNext(value);
    }

    @Override
    protected void hookOnComplete(a) {
        actual.onComplete();
    }

    @Override
    protected void hookOnError(Throwable throwable) { actual.onError(throwable); }}Copy the code

Look ActualSubscriberDelegater agent real subscriber all behavior, but this is not the proxy pattern, but the delegate pattern.

  • hookOnSubscribe: The method is in the parent classonSubscribeCalled when a method is called;
  • hookOnNext: The method is in the parent classonNextCalled when a method is called;
  • hookOnComplete: The method is in the parent classonCompleteCalled when a method is called;
  • hookOnError: The method is in the parent classonErrorCalled when a method is called;

As a result, ActualSubscriberDelegater can be used to accomplish debugging, diary operations such as printing.

Alibaba Sentinel also combines transform API with BaseSubscriber to use adaptive reaction Reactor database. We can also use this to calculate the execution time of an interface.

forspring-data-r2dbcRealize dynamic switching of multiple data sources

Hotkit-r2dbc is the author’s personal open source project, encapsulating the spring-data-R2DBC multi-data source dynamic switch implementation.

  • Making a link:https://github.com/wujiuye/hotkit-r2dbc

Hotkit-r2dbc dynamic data source switching for hotKit-R2DBC is very simple, you don’t need to use hotkit-R2DBC, you can also implement dynamic data source switching for Spring-data-R2DBC.

Spring-data-r2dbc provides connection factory routing classes: AbstractRoutingConnectionFactory, we only need to inherit AbstractRoutingConnectionFactory and realize its determineCurrentLookupKey method, Return the correct data source key when the method is called.

/** * ConnectionFactory Route **@author wujiuye 2020/11/03
 */
public class HotkitR2dbcRoutingConnectionFactory extends AbstractRoutingConnectionFactory {

    private final static String DB_KEY = "HOTKIT-R2DBC-DB";

    public HotkitR2dbcRoutingConnectionFactory(Map<String, ConnectionFactory> connectionFactoryMap) {      
        / /...
        setTargetConnectionFactories(connectionFactoryMap);
        //....
    }
    
    @Override
    protected Mono<Object> determineCurrentLookupKey(a) {
        return Mono.subscriberContext().handle((context, sink) -> {
            if(context.hasKey(DB_KEY)) { sink.next(context.get(DB_KEY)); }}); }}Copy the code

Of course, this doesn’t work, you need to provide a method for the aspect to get the Context and write to the data source, so the complete connection factory router would look like this:

/** * ConnectionFactory Route **@author wujiuye 2020/11/03
 */
public class HotkitR2dbcRoutingConnectionFactory extends AbstractRoutingConnectionFactory {

    private final static String DB_KEY = "HOTKIT-R2DBC-DB";

    public HotkitR2dbcRoutingConnectionFactory(Map<String, ConnectionFactory> connectionFactoryMap) {      
        / /...
        setTargetConnectionFactories(connectionFactoryMap);
        //....
    }
 
    // Write to the data source
    public static <T> Mono<T> putDataSource(Mono<T> mono, String dataSource) {
        return mono.subscriberContext(context -> context.put(DB_KEY, dataSource));
    }
    
    // Write to the data source
    public static <T> Flux<T> putDataSource(Flux<T> flux, String dataSource) {
        return flux.subscriberContext(context -> context.put(DB_KEY, dataSource));
    }

    @Override
    protected Mono<Object> determineCurrentLookupKey(a) {
       return Mono.subscriberContext().handle((context, sink) -> {
            if(context.hasKey(DB_KEY)) { sink.next(context.get(DB_KEY)); }}); }}Copy the code

The implementation of the section class is as follows:

@Component
@Aspect
@Order(Ordered.HIGHEST_PRECEDENCE)
public class DynamicDataSourceAop {

    @Pointcut(value = "@annotation(com.wujiuye.hotkit.r2dbc.annotation.R2dbcDataBase)")
    public void point(a) {}@Around(value = "point()")
    public Object aroudAop(ProceedingJoinPoint pjp) throws Throwable {
        MethodSignature signature = (MethodSignature) pjp.getSignature();
        Method method = signature.getMethod();
        R2dbcDataBase dataSource = method.getAnnotation(R2dbcDataBase.class);
        // The method returns a value of type Mono
        if (method.getReturnType() == Mono.class) {
            returnHotkitR2dbcRoutingConnectionFactory.putDataSource((Mono<? >) pjp.proceed(), dataSource.value()); }// The method returns a value of type Flux
        else {
            returnHotkitR2dbcRoutingConnectionFactory.putDataSource((Flux<? >) pjp.proceed(), dataSource.value()); }}}Copy the code

After the target method is executed and returns Mono or Flux, the section calls the subscriberContext method that returns Mono or Flux to convert the returned value Mono to a MonoSubscriberContext. Or turn the return value Flux into a FluxContextStart.

Using Mono as an example, write the data source to the Context in the callback Function of the MonoSubscriberContext#subscribe method. When Mono is finally subscribed, The MonoSubscriberContext passes the Context to the subscriber.

Pay attention to the order of the data source and transaction facets to avoid invalid transactions.

Use a test case to illustrate when to switch data sources:

public class RoutingTest extends SupporSpringBootTest {

    @Resource
    private DatabaseClient client;
    
    @Test
    public void test(a) throws InterruptedException {
         Mono<Void> operation = client.execute("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
                .bind("id"."joe")
                .bind("name"."Joe")
                .bind("age".34)
                .fetch()
                .rowsUpdated()
                .then();
        // Switch data sourcesMono<Void> dbOperation = HotkitR2dbcRoutingConnectionFactory.putDataSource(operation,MasterSlaveMode.Slave); dbOperation.subscribe(); TimeUnit.SECONDS.sleep(Integer.MAX_VALUE); }}Copy the code

The publisher operation in real before executing Sql must be from the connection factory router connection factory for publishers, namely the connection factory router determineCurrentLookupKey is invoked.

So, after operation turns to dbOperation, subscribing to dbOperation writes the Context to the subscribers of dbOperation and puts the data source, the Context is passed in the stream, Eventually operation of subscribers can call determineCurrentLookupKey method from the subscriber’s Context access to the data source, this is the implementation principle of multiple source switching.

End

This article covers only Mono’s synchronous subscription process, because synchronous subscriptions are easier to understand and Mono is easier to introduce than Flux.

I recommend that you read the source code for Flux or the asynchronous subscription code to understand the Reactor database.