preface

This is a translation. In fact, I finished reading this article a long time ago, but MY research on responsive programming was not deep enough, and I was ashamed to write the translation. In addition, the original translation was controversial, so I had no motivation. As the two leaders in today’s exchange group have a heated debate on the topic of responsive programming, I took the opportunity to translate this article while I was still impressed. Here, too, are the points of contention:

Is responsive programming useless in a stand-alone environment?

The conclusion: there is no conclusion, and I think I have to look at this question with some skepticism. We also talked about RSocket, the latest “new” protocol to be used at SpringOne. The github address is “new” in quotes. If you look at the COMMIT log of RSocket, it has been around for three years. If you are interested, you can browse through the hot technology in the last two or three months of this year.

There is a strange thing about the Java community, that is, RxJava, Reactor, WebFlux and other terms and frameworks of responsive programming are always in a state of wanting to know, feeling new, but not knowing much, and using little. I have already talked about this topic in my previous post about “Reactive Programming a Technology, Expressed separately”, and the discussion in today’s group has deepened my reflection. Many of my friends in the Java community (I’m one of them) have always been in a nomenclature that knows the fundamentals, not deep users, of reactive programming. Maybe it really has something to do with circles. According to Shi Chong, the people in Scala circles are somehow higher than us (in terms of responsive programming).

Is really haven’t post for a long time, to say sorry to you, after the update frequency is certainly not as often as he used to be like before very hard-working (say), in part because of writing articles on the company Intranet to share with you to discuss on the public, the other part is the present I also in learning phase of internal framework, not too convenient refined into the article, in the end, The biggest reason is that I need to learn (tou) (LAN) (da) (you) (xi) (west) these days. Well, that’s all nonsense. Here’s the body of the translation.

The introduction

Reactive Programming is one of the questions you may have about Reactive Programming: We already have streams for Java8, CompletableFuture, and Optional, so why RxJava and Reactor?

It’s easy to answer this question, and if you’re dealing with simple problems in reactive programming, you really don’t need the support of those third party libraries. But as complex problems arise, you end up with a bunch of ugly code. The code then becomes more and more complex and difficult to maintain, and RxJava and Reactor have a number of handy features that can solve your current problems and secure some foreseeable requirements for the future. This article abstracts eight standards from the responsive programming model that will help us understand how standard features differ from these libraries:

  1. Composable
  2. Lazy
  3. Reusable
  4. Asynchronous
  5. Cacheable
  6. Push or Pull (push-pull model)
  7. At shi chong’s suggestion, the word should be translated as “Backpressure” instead of “Backpressure”.
  8. Operator Fusion

We will compare these features with the following classes:

  1. CompletableFuture (Java 8)
  2. Stream (Java 8)
  3. Optional (Java 8)
  4. Observable (RxJava 1)
  5. Observable (RxJava 2)
  6. Flowable (RxJava 2)
  7. Flux (Reactor Core)

Let’s get started

1. Composable

These classes support the Composable feature, making it easy for users to think in terms of functional programming, which is why we like them.

CompletableFuture – Numerous.then*() methods allow us to build a pipeline that passes null values, single values, and exceptions.

Stream – Provides a programming interface for many chain operations, allowing multiple values to be passed between operations.

Optional – Provides intermediate operations.map(),.flatmap (),.filter().

Observable, Flowable, Flux – same as Stream

2. Lazy

CompletableFuture – Has no lazy execution and is essentially just a container for asynchronous results. These objects are created to represent the corresponding work, and by the time the CompletableFuture is created, the corresponding work has already been executed. But it doesn’t know any details of its work, and only cares about results. Therefore, there is no way to execute the entire pipeline from top to bottom. The next phase starts when the result is set to the CompletableFuture.

Stream – All intermediate operations are deferred. All terminal operations trigger actual computations.

Optional – No lazy execution, all operations are executed at once.

Observable, Flowable, Flux – lazy execution, executed only when a subscriber is present, not otherwise.

3. Reusable

CompletableFuture – reusable, it’s just a wrapper class for the actual value. It is important to note, however, that this packaging is changeable. The.obtrude*() method modifies its contents and is safe to reuse if you are sure that no one will ever call it.

Stream – cannot be multiplexed. The Java Doc notes:

A stream should be operated on (invoking an intermediate or terminal stream operation) only once. A stream implementation may throw IllegalStateException if it detects that the stream is being reused. However, since some stream operations may return their receiver rather than a new stream object, it may not be possible to detect reuse in all cases.

(Stream can only be called once. If it is checked that the stream is being reused, it will run out and throw an IllegalStateException. But some stream operations return their recipients instead of a new stream object, so reuse cannot be detected in all cases.)

Optional – Fully reusable because it is immutable and all operations are performed immediately.

Observable, Flowable, Flux – designed to be reused by nature. When there are subscribers, each execution completes one side from the initial point.

4. Asynchronous

CompletableFuture – The point of this class is that it asynchronously concatenates multiple operations. The CompletableFuture represents an operation that will be associated with an Executor. If an Executor is not explicitly specified, the public ForkJoinPool thread pool is used by default. The thread pool can be obtained by using ForkJoinPool.commonPool(). By default it creates as many threads as the system hardware can support (usually equal to the number of cores on the CPU, or twice as many if your CPU supports hyperthreading). However, you can also use the JVM parameter to specify the number of threads in the ForkJoinPool.

-Djava.util.concurrent.ForkJoinPool.common.parallelism=?
Copy the code

Or provide a specified Executor when creating the CompletableFuture.

Stream – There is no support for creating asynchronous execution flows, but parallel streams can be created using methods such as stream.parallel().

Optional – Not supported, it’s just a container.

Observable, Flowable, Flux – designed specifically to build asynchronous systems, but synchronous by default. SubscribeOn and observeOn allow you to control subscriptions and receiving (this thread calls the observer onNext/onError/onCompleted methods).

The subscribeOn method lets you decide which Scheduler executes the Observable.create method. Even if you don’t call the create method, the system will do the same thing internally. Such as:

Observable
  .fromCallable(() -> {
    log.info("Reading on thread: " + currentThread().getName());
    return readFile("input.txt");
  })
  .map(text -> {
    log.info("Map on thread: " + currentThread().getName());
    return text.length();
  })
  .subscribeOn(Schedulers.io()) // <-- setting scheduler
  .subscribe(value -> {
     log.info("Result on thread: " + currentThread().getName());
  });
Copy the code

Output:

Reading file on thread: RxIoScheduler-2
Map on thread: RxIoScheduler-2
Result on thread: RxIoScheduler-2
Copy the code

Instead, observeOn() controls which Scheduler is used to run the downstream execution phase after observeOn(). Such as:

Observable
  .fromCallable(() -> {
    log.info("Reading on thread: " + currentThread().getName());
    return readFile("input.txt");
  })
  .observeOn(Schedulers.computation()) // <-- setting scheduler
  .map(text -> {
    log.info("Map on thread: " + currentThread().getName());
    return text.length();
  })
  .subscribeOn(Schedulers.io()) // <-- setting scheduler
  .subscribe(value -> {
     log.info("Result on thread: " + currentThread().getName());
  });
Copy the code

Output:

Reading file on thread: RxIoScheduler-2
Map on thread: RxComputationScheduler-1
Result on thread: RxComputationScheduler-1
Copy the code

5. Cacheable

What’s the difference between cacheable and reusable? Suppose we have Pipeline A, repeat it twice to create two new pipelines B = A + X and C = A + Y

  • If both B and C execute successfully, then A is reusable.
  • If both B and C execute successfully, and A executes the entire pipeline only once in the process, then A is called cacheable. This means that cacheable must mean reusable.

CompletableFuture – same as reusable answers.

Stream – The result of an intermediate operation cannot be cached unless a termination operation is called.

Optional – cacheable, all operations are executed immediately and cached.

Observable, Flowable, Flux – not cacheable by default, but can be made cacheable by calling.cache(). Such as:

Observable<Integer> work = Observable.fromCallable(() -> {
  System.out.println("Doing some work");
  return 10;
});
work.subscribe(System.out::println);
work.map(i -> i * 2).subscribe(System.out::println);
Copy the code

Output:

Doing some work
10
Doing some work
20
Copy the code

Using the cache () :

Observable<Integer> work = Observable.fromCallable(() -> {
  System.out.println("Doing some work");
  return 10;
}).cache(); // <- apply caching
work.subscribe(System.out::println);
work.map(i -> i * 2).subscribe(System.out::println);
Copy the code

Output:

Doing some work
10
20
Copy the code

6. Push or Pull

Stream and optional-pull models. Call different methods (.get(),.collect(), etc.) to pull results from pipeline. The pull model is usually associated with blocking and synchronization, which is also fair. When a method is called, the thread blocks until data arrives.

CompletableFuture, Observable, Flowable, Flux – Push model. You are notified when you subscribe to a pipeline and certain events are executed. Push models are often associated with words like non-blocking and asynchronous. When a pipeline is running on a thread, you can do anything. You have defined a piece of code to be executed, and when the notification arrives, the code will be executed in the next phase.

7. Backpressure

To support backpressure, the pipeline must be a push model.

Backpressure describes a scenario in a pipeline where the processing speed is not keeping up in some asynchronous phases and upstream producers need to be told to slow down. Direct failure is not acceptable and results in massive data loss.

Stream & Optional – Backpressing is not supported because they are pull models.

CompletableFuture – There is no problem because it only produces zero or one result.

Observable(RxJava 1), Flowable, Flux – supported. Common policies are as follows:

  • Buffering – Buffering all onNext values until the downstream consumes them.

  • Drop Recent – Discards the most Recent onNext value if the downstream processing rate cannot keep up.

  • Use Latest – If the downstream processing rate is not up to speed, only the Latest onNext value is provided and the previous value is overwritten.

  • None-onnext events are raised directly, without buffering or discarding.

  • Exception – Throws an Exception if downstream processing can’t keep up.

Observable(RxJava 2) – Not supported. Many RxJava 1 users use Observables to handle events that do not apply backpressure, or use observables without any policies configured, resulting in unexpected exceptions. So RxJava 2 clearly distinguishes between Flowable that supports backpressure and Observable that does not.

8. Operator Fusion

The implication of operation fusion is that it allows the execution stages to change at different points in the lifecycle, thereby eliminating the overhead caused by the architectural factors of the class library. All of these optimizations are handled internally to make it look transparent to external users.

Only RxJava 2 and Reactor support this feature, but in different ways. In general, there are two types of optimization:

Macro-fusion – Replace two or more successive operations with one operation

Micro-fusion – The end operation of an output queue, and the start operation of an input queue, can share an instance of a queue. For example, instead of calling Request (1) and processing onNext() ‘:

Otherwise, let the subscriber pull the value directly from the parent Observable.

Refer to Parts 1 and 2 for more information

conclusion

A picture is worth a thousand words

Classes like Stream, CompletableFuture, and Optional are created to solve specific problems. And they are well suited for solving these problems. If they meet your needs, you can use them right away.

However, different problems have different complexity, and some problems can only be solved by new technologies, and new technologies are also emerging to solve those problems of high complexity. RxJava and Reactor are general-purpose tools that help you solve problems declaratively, rather than using less specialized tools to solve reactive programming problems that turn your solution into a hack.

Welcome to follow my wechat official account: “Kirito technology sharing”, any questions about this article will be answered, bring more Java related technology sharing.