There are more and more responsive programming libraries or frameworks for Java based on the Reactor model, such as RxJava, Project Reactor, vert. x, and many more. 9 in Java, Java programming also introduced their own response of a standard interface, that is Java. Util. Concurrent. The Flow of the class. This class specifies the interfaces and abstractions that Java reactive programming needs to implement. This series will focus on the Project Reactor implementation.
Here is the way, in order to be kept for no upgrade to Java 9 users can also be compatible, Java. Util. Concurrent. The Flow this class also are placed in a jar for the Java version before 9, rely on is:
< the dependency > < groupId > org. Reactivestreams < / groupId > < artifactId > reactive - streams < / artifactId > < version > 1.0.3 < / version > </dependency>Copy the code
The Project Reactor described in this series is an implementation of Reaction-Streams. First, let’s take a look at what responsive programming is and how Java implements it
What is responsive programming and how does Java implement it
Here we use a reply to obtain Zhihu through the unique ID as an example. First of all, let’s make it clear that after an HTTP request is processed on the server, the request will be completed if the response is written back to the connection of this request, as follows:
Public void request(Connection Connection, HttpRequest Request) {// Handle request, omit code connection.write(response); // Complete the response}Copy the code
Given that two interfaces need to be called to get the answer, to get the number of comments and to get the answer information, traditional code might write something like this:
Public void getCommentCount(Connection Connection, HttpRequest Request) {Integer commentCount = null; Try {// Get the number of comments from the cache, blocking IO commentCount = getCommnetCountFromCache(id); } catch(Exception e) {try {// Fail to get from database, block IO commentCount = getVoteCountFromDB(id); } catch(Exception ex) { } } connection.write(commentCount); } public void getAnswer(Connection Connection, HttpRequest Request) {// Obtain the number of likes Integer voteCount = null; IO voteCount = getVoteCountFromCache(id); IO votecountFromcache (id); } catch(Exception e) {try {// voteCount = getVoteCountFromDB(id); } catch(Exception ex) {}} IO Answer Answer = getAnswerFromDB(id); ResultVO Response = new ResultVO();if(voteCount ! = null) { response.setVoteCount(voteCount); }if(answer ! = null) { response.setAnswer(answer); } connection.write(response); // Complete the response}Copy the code
Under this implementation, your process only needs a single thread pool that holds all the requests. There are two drawbacks to this implementation:
- Thread pool IO is blocked, causing one storage to slow down or the cache to break down, and all services are blocked. Suppose now that the comment cache suddenly hangs, all access to the database, resulting in slow requests. As threads wait for IO responses, the only thread pool is overwhelmed and unable to process requests for answers.
- For getting the answer information, getting the number of likes and getting the answer information can actually be done concurrently. You don’t have to get the likes before you get the answers.
Now, NIO non-blocking IO is very common, and with non-blocking IO, we can do reactive programming so that our threads don’t block, but instead process requests all the time. How does this work?
In traditional BIO, after a thread writes data to a Connection, the current thread enters the Block state until the response returns, and then does whatever it does after the response returns. NIO is a thread that writes data to a Connection, caches in one place what needs to be done after the response is returned, and then returns. So when you get a response back, the Read event of the Selector in NIO is going to be Ready, and the thread that scans the Selector event is going to tell your thread pool that the data is Ready, and then one of the threads in the thread pool, they’re going to take whatever they just cached to do and the parameters, and they’re going to process it.
So, how do you implement caching of what needs to be done and parameters after the response is returned? Java itself provides two types of interfaces, the Callback based Interface (the various Functional interfaces introduced in Java 8) and the Future framework.
Implementation based on Callback:
Public void getAnswer(Connection Connection, HttpRequest Request) {ResultVO ResultVO = new ResultVO(); GetVoteCountFromCache (id, (count, throwable) ->if(throwable ! GetVoteCountFromDB (id, (count2, throwable2) -> {getVoteCountFromDB(id, (count2, throwable2));if(throwable2 == null) { resultVO.setVoteCount(voteCount); GetAnswerFromDB (id, (answer, throwable3) -> {getAnswerFromDB(id, (answer, throwable3) -> {if (throwable3 == null) {
resultVO.setAnswer(answer);
connection.write(resultVO);
} else{ connection.write(throwable3); }}); }); }elseResultvo. setVoteCount(voteCount); GetAnswerFromDB (id, (answer, throwable2) -> {getAnswerFromDB(id, (answer, throwable2) ->if(throwable2 == null) { resultVO.setAnswer(answer); // Return a response connection.write(resultVO); }else{// Return an error response connection.write(throwable2); }}); }}); }Copy the code
As you can see, the deeper the call hierarchy, the more difficult the callback becomes to write, and the more verbose the code. In addition, based on CallBack, it is difficult to obtain the number of likes and obtain the answer information concurrently, so it is still necessary to obtain the number of likes first and then obtain the answer information.
What about future-based? Let’s try this out with CompletableFuture, introduced after Java 8.
Public void getAnswer(Connection Connection, HttpRequest Request) {ResultVO ResultVO = new ResultVO(); Completablefuture. allOf(getVoteCountFromCache(id)) // An exception occurred, Read from the database. ExceptionallyComposeAsync (throwable - > getVoteCountFromDB (id)) / / read after, ThenAccept (VoteCount -> {resultvo.setVotecount (VoteCount); }), getAnswerFromDB(id).thenAccept(answer -> { resultVO.setAnswer(answer); }) ).exceptionallyAsync(throwable -> { connection.write(throwable); }).thenRun(() -> { connection.write(resultVO); }); }Copy the code
This implementation looks much simpler and reads the likes and answers simultaneously. Based on the realization of Completableuture, Project Reactor adds more combination methods and more perfect exception handling mechanism, as well as the handling mechanism in the face of back pressure and retry mechanism.
Problems encountered in responsive programming – back pressure
Because of responsive programming, there is no blocking, so it brings up a problem that has been ignored before because it almost never happens, namely Back Pressure.
Back pressure refers to the problem of Buffer overflows when there are too many upstream requests for downstream services to respond. In reactive programming, because threads do not block, they will cache the current parameters and things to do when they encounter IO, which will increase the throughput and memory footprint. If not limited, it will likely OutOfMemory, which is the back pressure problem.
In this regard, the model based on Project Reactor has a processing method, but Completableuture does not exist in this system.
Why isn’t responsive programming popular in business development microservices right now
Mainly because of database IO, not NIO.
Whether it’s Java’s own Future framework, Spring WebFlux, or vert. x, they’re all non-blocking Ractor model-based frameworks (the latter two are both implemented using Netty).
In blocking programming, every request needs to be handled by a thread, and if I/O is blocked, that thread will be blocked. But in non-blocking programming, based on reactive programming, threads are not blocked and can handle other requests. As a simple example, if there is only one thread pool, when the request comes in, the thread pool needs to read the DATABASE IO. This IO is NIO non-blocking IO. Then write the request data to the database connection and return it directly. Then the database returns the data, and the link’s Selector is ready with a Read event, and the data processing (equivalent to a callback) is Read through the thread pool, not necessarily on the same thread as before. That way, instead of waiting for the database to return, the thread can process other requests directly. In this case, even if the SQL execution of one service takes a long time, the execution of other services will not be affected.
The foundation of all this, however, is that IO must be non-blocking IO, known as NIO (or AIO). Official JDBC does not have NIO, only BIO implementation. Instead of having the thread write the request to the link and return directly, it must wait for the response. The solution, however, is to use another thread pool to process the database request and wait for the callback to come back. In this case, business thread pool A passes the database BIO request to thread pool B for processing, reads the data, and then passes the rest of the business logic to A. So A doesn’t have to block and can handle other requests. However, there is still A case where all threads of B are blocked and the queue is full and requests of A are blocked because the execution of A business SQL takes A long time. This is not A perfect implementation. To be truly perfect, you need JDBC to implement NIO.
Java’s own Future framework can use JDBC like this:
@GetMapping
public DeferredResult<Result> get() {
DeferredResult<Result> deferredResult = new DeferredResult<>();
CompletableFuture.supplyAsync(() -> {
returnBlock database IO; //dbThreadPool is used to handle blocked database IO}, ThenComposeAsync (result -> {// Spring DeferredResult to implement asynchronous callback writing results return DeferredResult. SetResult (result); });return deferredResult;
}
Copy the code
WebFlux can also use blocking JDBC, but again:
@GetMapping
public Mono<Result> get() {
return Mono.fromFuture(CompletableFuture.supplyAsync(() -> {
returnBlock database IO; //dbThreadPool to handle blocking database IO}, dbThreadPool); }Copy the code
Vert.x can also use blocked JDBC, as well:
@GetMapping
public DeferredResult<Result> get() {
DeferredResult<Result> deferredResult = new DeferredResult<>();
getResultFromDB().setHandler(asyncResult -> {
if (asyncResult.succeeded()) {
deferredResult.setResult(asyncResult.result());
} else{ deferredResult.setErrorResult(asyncResult.cause()); }});return deferredResult;
}
private WorkerExecutor dbThreadPool = vertx.createSharedWorkerExecutor("DB", 16);
private Future<Result> getResultFromDB() {
Future<Result> result = Future.future();
dbThreadPool.executeBlocking(future -> {
returnBlock database IO; },false, asyncResult -> {
if (asyncResult.succeeded()) {
result.complete(asyncResult.result());
} else{ result.fail(asyncResult.cause()); }});return result;
}
Copy the code
This encapsulates the blocking JDBC IO by using a different thread pool (or the original thread pool, which uses a different thread than the requested thread to implement the callback instead of blocking the wait).
However, this almost does not improve the performance of database IO led applications, but also increases the thread switch, which is more than worth the loss. So, you need to use a database client that actually implements NIO. There are NIO JDBC clients, but none of them are popular:
- Vert.x client: vertx. IO /docs/vertx-…
- R2jdbc client: r2DBC. IO /
- Jasync-sql client: github.com/jasync-sql/…