Ha ha ha ha ha, the title is a little wild. But since you’re here, take a look, responsive programming is becoming more and more important as high concurrency strains performance.

By the way, this is a Java article.

Let’s cut to the chase.

Core components for responsive programming

Before we move on, I hope you know a little bit about the publisher/subscriber model.

Look directly at the picture:

Talk is cheap, show you the code!

public class Main {

    public static void main(String[] args) {
        Flux<Integer> flux = Flux.range(0.10);
        flux.subscribe(i -> {
            System.out.println("run1: " + i);
        });
        flux.subscribe(i -> {
            System.out.println("run2: "+ i); }); }}Copy the code

Output:

run1: 0
run1: 1
run1: 2
run1: 3
run1: 4
run1: 5
run1: 6
run1: 7
run1: 8
run1: 9
run2: 0
run2: 1
run2: 2
run2: 3
run2: 4
run2: 5
run2: 6
run2: 7
run2: 8
run2: 9

Process finished with exit code 0
Copy the code

Flux

Flux is a multi-element producer, meaning that it can produce multiple elements to form a sequence of elements for use by subscribers.

Mono

The difference between Mono and Flux is that it can only produce one element for producers to subscribe to, which is a difference in quantity.

A common application of Mono is Mono<ServerResponse> as a return value for WebFlux. After all, there is only one Response object per request, so Mono is just right.

Quickly create a Flux/Mono and subscribe to it

Take a look at some official documentation to demonstrate the method.

Flux<String> seq1 = Flux.just("foo"."bar"."foobar");

List<String> iterable = Arrays.asList("foo"."bar"."foobar");
Flux<String> seq2 = Flux.fromIterable(iterable);

Mono<String> noData = Mono.empty();

Mono<String> data = Mono.just("foo");

Flux<Integer> numbersFromFiveToSeven = Flux.range(5.3);
Copy the code

Subscribe () method (Lambda form)

  • The subscribe() method accepts a Lambda expression by default for use as a subscriber. It comes in four varieties.
  • The fourth parameter of subscribe() specifies the number of initial requests when the subscribe signal arrives, or all requests if null (long.max_value).
public class FluxIntegerWithSubscribe {

    public static void main(String[] args) {
        Flux<Integer> integerFlux = Flux.range(0.10);
        integerFlux.subscribe(i -> {
            System.out.println("run");
            System.out.println(i);
        }, error -> {
            System.out.println("error");
        }, () -> {
            System.out.println("done");
        }, p -> {
            p.request(2); }); }}Copy the code

If the initial request is removed, the maximum value will be requested:

public class FluxIntegerWithSubscribe {

    public static void main(String[] args) {
        Flux<Integer> integerFlux = Flux.range(0.10);
        The fourth parameter of subscribe() specifies the number of initial requests when the subscribe signal arrives, or all requests if null (long.max_value).
        / / the rest of the subscribe () see the source code or document: https://projectreactor.io/docs/core/release/reference/#flux
        integerFlux.subscribe(i -> {
            System.out.println("run");
            System.out.println(i);
        }, error -> {
            System.out.println("error");
        }, () -> {
            System.out.println("done"); }); }}Copy the code

Output:

run
0
run
1
run
2
run
3
run
4
run
5
run
6
run
7
run
8
run
9
done

Process finished with exit code 0
Copy the code

Inherit BaseSubscriber(non-lambda form)

  • This approach is more like a substitution for a Lambda expression.
  • For subscriptions based on this method, there are some caveats, such as requesting at least once for the first subscription. Otherwise, the program cannot continue to obtain new elements.
public class FluxWithBaseSubscriber {

    public static void main(String[] args) {
        Flux<Integer> integerFlux = Flux.range(0.10);
        integerFlux.subscribe(new MySubscriber());
    }

    /** * Generally, this is done by inheriting BaseSubscriber
      
       , and generally custom hookOnSubscribe() and hookOnNext() methods */
      
    private static class MySubscriber extends BaseSubscriber<Integer> {

        /** * is called on the first subscription
        @Override
        protected void hookOnSubscribe(Subscription subscription) {
            System.out.println("Here we go!);
            // Remember to request it at least once, otherwise the hookOnNext() method will not execute
            request(1);
        }

        /** * call */ each time a new value is read
        @Override
        protected void hookOnNext(Integer value) {
            System.out.println("Start reading...");
            System.out.println(value);
            // Indicate how many to read next
            request(2);
        }

        @Override
        protected void hookOnComplete(a) {
            System.out.println("It's over."); }}}Copy the code

Output:

To start! Start reading...0Start reading...1Start reading...2Start reading...3Start reading...4Start reading...5Start reading...6Start reading...7Start reading...8Start reading...9Process Finished with exit code0
Copy the code

Terminate a subscription :Disposable

  • Disposable is an interface that is returned when you subscribe and contains a number of methods that can manipulate the subscription.
  • Like unsubscribe.

Here we use multithreading to simulate that the producer produces quickly and then unsubscribing immediately (canceling immediately but because the producer is so fast, the subscriber still receives some elements).

Other methods, such as disposables.comPosite () will get a collection of Disposable, and calling its Dispose () method will call its Dispose () method on all Disposable in the collection.

public class FluxWithDisposable {

    public static void main(String[] args) {
        Disposable disposable = getDis();
        // The number of prints is usually different each time because it is cancelled by calling the Dispose () method of Disposable, but if the producer origin is too fast, it may not end in time.
        disposable.dispose();
    }

    private static Disposable getDis(a) {
        class Add implements Runnable {

            private final FluxSink<Integer> fluxSink;

            public Add(FluxSink<Integer> fluxSink) {
                this.fluxSink = fluxSink;
            }

            @Override
            public synchronized void run(a) {
                fluxSink.next(new Random().nextInt());
            }
        }
        Flux<Integer> integerFlux = Flux.create(integerFluxSink -> {
            Add add = new Add(integerFluxSink);
            new Thread(add).start();
            new Thread(add).start();
            new Thread(add).start();
            new Thread(add).start();
            new Thread(add).start();
            new Thread(add).start();
            new Thread(add).start();
            new Thread(add).start();
            new Thread(add).start();
            new Thread(add).start();
            new Thread(add).start();
        });
        returnintegerFlux.subscribe(System.out::println); }}Copy the code

Output:

The output here may vary from call to call because the subscription is cancelled, so how much is printed depends on the CPU speed at that moment.Copy the code

Adjust the publisher publishing rate

  • To ease the pressure on subscribers, subscribers can remodel the rate at which publishers publish through negative pressure flow backtracking. The most typical use is the following — set your own request rate by inheriting from BaseSubscriber. The hookOnSubscribe() method must be requested at least once, or your publishers may get stuck.
public class FluxWithLimitRate1 {

    public static void main(String[] args) {
        Flux<Integer> integerFlux = Flux.range(0.100);
        integerFlux.subscribe(new MySubscriber());
    }

    private static class MySubscriber extends BaseSubscriber<Integer> {

        @Override
        protected void hookOnSubscribe(Subscription subscription) {
            System.out.println("Here we go!);
            // Remember to request it at least once, otherwise the hookOnNext() method will not execute
            request(1);
        }

        @Override
        protected void hookOnNext(Integer value) {
            System.out.println("Start reading...");
            System.out.println(value);
            // Indicate how many to read next
            request(2);
        }

        @Override
        protected void hookOnComplete(a) {
            System.out.println("It's over!); }}}Copy the code
  • Or use the limitRate() instance method, which returns a Flux or Mono with a limited rate. Some upstream operations can change the rate of requests from downstream subscribers, and some operations have a prefetch integer as input and can fetch sequence elements greater than the number of requests from downstream subscribers in order to process their own internal sequences. These prefetch operations generally default prefetch 32, but for optimization; Each time 75% of the pre-acquired amount has been obtained, another 75% is obtained. This is called “complementary optimization.”
public class FluxWithLimitRate2 {

    public static void main(String[] args) {
        Flux<Integer> integerFlux = Flux.range(0.100);
        // Finally, let's look at some of the pre-fetch methods that Flux provides:
        // Indicates the number of prefetches
        integerFlux.limitRate(10);
        // lowTide specifies the value of the supplementary optimization of the prefetch operation. HighTide indicates the number of prefetches.
        integerFlux.limitRate(10.15);
        // limitRate(2); // limitRate(2); You can only get two at a time haha!
        LimitRequest (N) limits the total number of inbound requests to N. If the number of indecent requests exceeds N, only N are returned, otherwise the actual number is returned. It then considers the request complete and sends an onComplete signal down the stream.
        integerFlux.limitRequest(5).subscribe(new MySubscriber());
        // This will only output 5.}}Copy the code

Creates a sequence programmatically

Static synchronization method: generate()

Now it’s time to generate Flux/Mono programmatically. Start with the generate() method, which is a synchronous method. The implication is that it is thread unsafe and its receiver can only receive input one at a time to generate Flux/Mono. That is, it can only be called once at any given time and only accepts one input.

Or, to put it another way, the sequence of elements it generates depends on how the code is written.

public class FluxWithGenerate {

    public static void main(String[] args) {
        // This is one of its variants: the first argument provides the initial state, and the second argument is a generator that writes data to the sink. The input arguments are state(usually an integer that records the state), and the sink.
        // For other variants, see the source code
        Flux.generate(() -> 0, (state, sink) -> {
            sink.next(state+"asdf");
            // Add a call to sink.complete() to terminate the build; Otherwise it's an infinite sequence.
            return state+1;
        }).subscribe(System.out::println);
        // The third argument to the generate method is called at the end of the build, consuming state.
        Flux.generate(AtomicInteger::new, (state, sink) -> {
            sink.next(state.getAndIncrement()+"qwer");
            return state;
        }).subscribe(System.out::println);
        / / the generate () looks like the workflow: next () - > next () - > next () - >...}}Copy the code
  • As you can see from the code above, each sink receives a value from the return value of the last generation method, i.e., state= the return value of the previous iteration (actually called the previous stream, just for convenience).

  • But this state is always a brand new one (+1 each time is certainly new), so is there any way to have the same reference and update the value for both iterations? The answer is atomic type. Which is the second way above.

Static asynchronous multithreading method: create()

Said synchronous generation, the next is asynchronous generation, or multi-threaded! Let’s welcome create()!!

  • The create() method exposes a FluxSink object through which we can access and generate the desired sequence. In addition, it can also trigger multithreaded events in callbacks.

  • Another feature of Create is that it is easy to connect other interfaces to reactive Bridges. Note that just because it’s asynchronously multithreaded doesn’t mean create can parallelize your code or execute it asynchronously; How do you understand that? That is, the Lambda expression code inside the create method is still single-threaded blocking. If you block the code where the sequence was created, then subscribers may not get the data even if they request it, because the sequence is blocked and no new one can be generated.

  • By default, the subscriber uses the same thread as the CREATE thread. Blocking the CREATE thread will prevent the subscriber from running.

  • The above problem can be solved by Scheduler, which will be mentioned later.

public class FluxWithCreate {

    public static void main(String[] args) throws InterruptedException {
        TestProcessor<String> testProcessor = new TestProcessor<>() {

            private TestListener<String> testListener;

            @Override
            public void register(TestListener<String> stringTestListener) {
                this.testListener = stringTestListener;
            }

            @Override
            public TestListener<String> get(a) {
                returntestListener; }}; Flux<String> flux = Flux.create(stringFluxSink -> testProcessor.register(new TestListener<String>() {
            @Override
            public void onChunk(List<String> chunk) {
                for(String s : chunk) { stringFluxSink.next(s); }}@Override
            public void onComplete(a) { stringFluxSink.complete(); }})); flux.subscribe(System.out::println); System.out.println("It's now 2020/10/22 22:58; I good trapped");
        TestListener<String> testListener = testProcessor.get();
        Runnable1<String> runnable1 = new Runnable1<>() {

            private TestListener<String> testListener;

            @Override
            public void set(TestListener<String> testListener) {
                this.testListener = testListener;
            }

            @Override
            public void run(a) {
                List<String> list = new ArrayList<>(10);
                for (int i = 0; i < 10; ++ i) {
                    list.add(i+"-run1"); } testListener.onChunk(list); }}; Runnable1<String> runnable2 =new Runnable1<>() {

            private TestListener<String> testListener;

            @Override
            public void set(TestListener<String> testListener) {
                this.testListener = testListener;
            }

            @Override
            public void run(a) {
                List<String> list = new ArrayList<>(10);
                for (int i = 0; i < 10; ++ i) {
                    list.add(i+"-run2"); } testListener.onChunk(list); }}; Runnable1<String> runnable3 =new Runnable1<>() {

            private TestListener<String> testListener;

            @Override
            public void set(TestListener<String> testListener) {
                this.testListener = testListener;
            }

            @Override
            public void run(a) {
                List<String> list = new ArrayList<>(10);
                for (int i = 0; i < 10; ++ i) {
                    list.add(i+"-run3"); } testListener.onChunk(list); }}; runnable1.set(testListener); runnable2.set(testListener); runnable3.set(testListener);// create the so-called "asynchronous", "multithreaded" means calling the sink.next() method in multiple threads. This can be seen in the push comparison below
        new Thread(runnable1).start();
        new Thread(runnable2).start();
        new Thread(runnable3).start();
        Thread.sleep(1000);
        testListener.onComplete();
        On the other hand, another variant of create can set parameters to achieve negative pressure control, depending on the source code.
    }
    public interface TestListener<T> {

        void onChunk(List<T> chunk);

        void onComplete(a);
    }

    public interface TestProcessor<T> {

        void register(TestListener<T> tTestListener);

        TestListener<T> get(a);
    }

    public interface Runnable1<T> extends Runnable {
         void set(TestListener<T> testListener); }}Copy the code

Static asynchronous single-threaded method: push()

After talking about asynchronous multi-threaded, synchronous generation methods, the next step is asynchronous single thread: push().

In fact, when it comes to the comparison between push and create, my personal understanding is as follows:

  • Create allows multithreaded environments to call the.next() method, just to generate the elements, and the order in which the elements are sequenced depends on… Well, random, after all, multi-threaded;

  • But push allows only one thread to produce elements, so it is ordered, and asynchronous means it can be produced in a new thread, not necessarily in the current thread.

  • By the way, both push and create support onCancel() and onDispose() operations. Generally speaking, onDispose only responds to cancel, while onDispose responds to error, cancel, complete, etc.

public class FluxWithPush {

    public static void main(String[] args) throws InterruptedException {
        TestProcessor<String> testProcessor = new TestProcessor<>() {

            private TestListener<String> testListener;

            @Override
            public void register(TestListener<String> testListener) {
                this.testListener = testListener;
            }

            @Override
            public TestListener<String> get(a) {
                return this.testListener; }}; Flux<String> flux = Flux.push(stringFluxSink -> testProcessor.register(new TestListener<>() {
            @Override
            public void onChunk(List<String> list) {
                for(String s : list) { stringFluxSink.next(s); }}@Override
            public void onComplete(a) { stringFluxSink.complete(); }})); flux.subscribe(System.out::println); Runnable1<String> runnable =new Runnable1<>() {

            private TestListener<String> testListener;

            @Override
            public void set(TestListener<String> testListener) {
                this.testListener = testListener;
            }

            @Override
            public void run(a) {
                List<String> list = new ArrayList<>(10);
                for (int i = 0; i < 10; ++i) { list.add(UUID.randomUUID().toString()); } testListener.onChunk(list); }}; TestListener<String> testListener = testProcessor.get(); runnable.set(testListener);new Thread(runnable).start();
        Thread.sleep(15);
        testListener.onComplete();
    }

    public interface TestListener<T> {
        void onChunk(List<T> list);
        void onComplete(a);
    }

    public interface TestProcessor<T> {
        void register(TestListener<T> testListener);
        TestListener<T> get(a);
    }

    public interface Runnable1<T> extends Runnable {
        void set(TestListener<T> testListener); }}Copy the code

Like create, push supports negative pressure adjustment. But I didn’t write it out. The Demo I tried all directly requested long.max_value. In fact, the negative pressure control was realized by calling the sink.onRequest(LongConsumer) method. Here’s the principle. If you want to explore it, please explore it yourself. I didn’t do it all afternoon.

Instance method: handle()

In the instance methods of Flux, handle is similar to filter and map.

public class FluxWithHandle {

    public static void main(String[] args) {
        Flux<String> stringFlux = Flux.push(stringFluxSink -> {
            for (int i = 0; i < 10; ++ i) {
                stringFluxSink.next(UUID.randomUUID().toString().substring(0.5)); }});// Get all strings that contain 'a'
        Flux<String> flux = stringFlux.handle((str, sink) -> {
            String s = f(str);
            if(s ! =null) { sink.next(s); }}); flux.subscribe(System.out::println); }private static String f(String str) {
        return str.contains("a")? str :null; }}Copy the code

Threading and scheduling

Schedulers static methods

In general, reactive frameworks do not support concurrency. P.s. Create is producer concurrency and is not concurrent itself. Therefore, there is no concurrency library available and developers need to implement it themselves.

Also, each operation usually runs in the same thread as the last one. They don’t have their own thread. The top operation is the same thread as the subscribe(). Such as Flux. The create (…). .handle(…) .subscribe(…) It’s all running on the main thread.

In a responsive framework, a Scheduler determines how operations are performed on which thread, similar to an ExecutorService. But it has a little bit more functionality. If you want to implement some concurrent operations, consider using the static methods provided by Schedulers to see what’s available:

Schedulers. Immediate (): The Runnable task is submitted directly from the current thread and executed immediately.

package com.learn.reactor.flux;

import reactor.core.scheduler.Schedulers;

/ * * *@author Mr.M
 */
public class FluxWithSchedulers {

    public static void main(String[] args) throws InterruptedException {
        // Schedulers. Immediate (): submit the Runnable task in the current thread and execute it immediately.
        System.out.println("Current thread:" + Thread.currentThread().getName());
        System.out.println("zxcv");
        Schedulers.immediate().schedule(() -> {
            System.out.println(The current thread is: + Thread.currentThread().getName());
            System.out.println("qwer");
        });
        System.out.println("asdf");
        // Make sure the asynchronous task can be printed
        Thread.sleep(1000); }}Copy the code

Immediate () is implemented by inserting the Runnable at the execution position. It’s no different than just writing the code here.

Schedulers.newsingle () : Ensures that each operation is performed using a new thread.

package com.learn.reactor.flux;

import reactor.core.scheduler.Schedulers;

/ * * *@author Mr.M
 */
public class FluxWithSchedulers {

    public static void main(String[] args) throws InterruptedException {
        // If you want to make every call a new thread, you can use schedulers.newSingle (), which ensures that every operation is performed using a new thread.
        Schedulers.single().schedule(() -> {
            System.out.println(The current thread is: + Thread.currentThread().getName());
            System.out.println("bnmp");
        });
        Schedulers.single().schedule(() -> {
            System.out.println(The current thread is: + Thread.currentThread().getName());
            System.out.println("ghjk");
        });
        Schedulers.newSingle(Thread 1 "").schedule(() -> {
            System.out.println(The current thread is: + Thread.currentThread().getName());
            System.out.println("1234");
        });
        Schedulers.newSingle(Thread 1 "").schedule(() -> {
            System.out.println(The current thread is: + Thread.currentThread().getName());
            System.out.println("5678");
        });
        Schedulers.newSingle(Thread 2 "").schedule(() -> {
            System.out.println(The current thread is: + Thread.currentThread().getName());
            System.out.println("0100");
        });
        Thread.sleep(1000); }}Copy the code

Schedulers.single(), which is used to create a new thread for the current operation, but remember that all operations using this method share a thread;

Schedulers. Elastic () : an elastic unbounded pool of threads.

Unbounded generally means unmanageable, as it can cause negative pressure problems and too many threads being created. So there’s an alternative to that.

Schedulers. BounededElastic () : bounded reusable thread pool

package com.learn.reactor.flux;

import reactor.core.scheduler.Schedulers;

/ * * *@author Mr.M
 */
public class FluxWithSchedulers {

    public static void main(String[] args) throws InterruptedException {
        Schedulers.boundedElastic().schedule(() -> {
            System.out.println(The current thread is: + Thread.currentThread().getName());
            System.out.println("1478");
        });
        Schedulers.boundedElastic().schedule(() -> {
            System.out.println(The current thread is: + Thread.currentThread().getName());
            System.out.println("2589");
        });
        Schedulers.boundedElastic().schedule(() -> {
            System.out.println(The current thread is: + Thread.currentThread().getName());
            System.out.println("0363");
        });
        Thread.sleep(1000); }}Copy the code

Schedulers. BoundedElastic () is a better choice, because it can work when needed to create a thread pool, and reuse the pool of free; At the same time, some pools are discarded if they are idle for more than a certain amount of time.

It also has a capacity limit, typically 10 times the number of CPU cores, which is the maximum capacity of its backup thread pool. A maximum of 100,000 tasks are submitted, and then they are queued and scheduled when they are available. If delayed, the delayed start time is calculated when a thread is available.

Thus Schedulers. BoundedElastic () for blocking I/O operation is a good choice, because it can make each operation has its own thread. But remember, too many threads can strain the system.

Schedulers.parallel() : Provides the capability for system-level parallelism

package com.learn.reactor.flux;

import reactor.core.scheduler.Schedulers;

/ * * *@author Mr.M
 */
public class FluxWithSchedulers {

    public static void main(String[] args) throws InterruptedException {
        Schedulers.parallel().schedule(() -> {
            System.out.println(The current thread is: + Thread.currentThread().getName());
            System.out.println("6541");
        });
        Schedulers.parallel().schedule(() -> {
            System.out.println(The current thread is: + Thread.currentThread().getName());
            System.out.println("9874");
        });
        Thread.sleep(1000); }}Copy the code

Finally, schedulers.parallel () provides the ability to parallelize by creating threads equal to the number of CPU cores.

Other thread operations

As an aside, you can also create new schedulers with ExecutorService. Of course, a bunch of Schedulers’ newXXX methods will work, too.

It’s important to note that the boundedElastic() method works with traditional blocking code, but neither single() nor Parallel () does, and if you do, you’ll throw an exception. A custom scheduler can set the ThreadFactory property to determine whether the receiving Thread is a NonBlocking instance.

Some of the Flux methods use the default Scheduler. For example, the Flux.interval() method uses the schedulers.parallel () method by default. You can change this default by setting the Scheduler.

In a responsive chain, there are two ways to switch the execution context: the publishOn() method, whose location in the streaming chain is important, and the subscribeOn() method. In the Reactor, you can add as many subscribers as you want in any form, but you can only activate all objects in the subscription chain when you set the subscription method. Only then can the request be traced back to the publisher to produce the source sequence.

Toggle execution context in the subscription chain

publishOn()

PublishOn () is added in the middle of the chain of operations just like a normal operation, and it affects the execution context of all operations below it. Here’s an example:

public class FluxWithPublishOnSubscribeOn {

    public static void main(String[] args) throws InterruptedException {
    	// Create a parallel thread
        Scheduler s = Schedulers.newParallel("parallel-scheduler".4);
        final Flux<String> flux = Flux
                .range(1.2)
                // Map must run on T.
                .map(i -> 10 + i)
                // The execution context is switched to the parallel thread
                .publishOn(s)
                // The map is still running on parallel threads, because the operations after publishOn() are switched to another execution context.
                .map(i -> "value " + i);
        // Suppose the new thread is named T
        new Thread(() -> flux.subscribe(System.out::println));
        Thread.sleep(1000); }}Copy the code

subscribeOn()

public class FluxWithPublishOnSubscribeOn {

    public static void main(String[] args) throws InterruptedException {
        // Still create a parallel thread
        Scheduler ss = Schedulers.newParallel("parallel-scheduler".4);
        final Flux<String> fluxflux = Flux
                .range(1.2)
                // The map is already running in the SS
                .map(i -> 10 + i)
                // Switch here, but switch the entire chain
                .subscribeOn(s)
                // The map is also running on ss
                .map(i -> "value " + i);
        // This is an anonymous thread TT
        new Thread(() -> fluxflux.subscribe(System.out::println));
        Thread.sleep(1000); }}Copy the code

The subscribeOn() method switches the entire subscription chain to the new execution context after the subscription. Wherever you are subscribeOn(), you can switch the sequence of subscriptions that follows the first one, and, of course, if there is a publishOn() that follows, publishOn() makes a new switch.