1. An overview of the

I remember looking at the Hystrix source code last year and being confused by RXJava. When I saw the word responsive, I was still confused. Some time ago, I learned vert. x, and then used CompletableFuture of Java8 to the extreme in various major frameworks, so I realized the importance of responsive programming. After all, responsiveness makes sense for a language that doesn’t naturally support Coroutines. But there are pros and cons, depending on the scene. Today we take a look at Spring5’s responsive web framework, WebFlux.

2. Responsive, asynchronous, and synchronous

  • Synchronous invocation: Using traditional blocking programming wastes too much thread resources in high concurrency because each request takes up one thread. If I/O blocks occur, the thread will be blocked and the CPU will not be fully utilized. And high concurrency can lead to the creation of too many threaded resources.
  • Asynchronous non-blocking: The result of execution is normally handled through callbacks, but as the business is complex, it can be callback hell and difficult to return to code. Using Java’s Future pattern, the operation of get is still blocked and not asynchronous.
  • Responsive: on the premise of readability of code, it supports asynchronous task, which can improve the throughput of the system. For some good responsive frameworks, such as Reactor, back pressure is also supported. However, responsiveness does not reduce the processing time of the request. It only improves CPU utilization and avoids unnecessary thread switches. The reactive form is similar to the observer push form. When the task is complete, the observer actively pushes it on to the consumer.

Back pressure: Because of the push-based model, messages pile up when reactive flow data is produced too quickly and consumers process it too slowly. To avoid this problem, you need to control the rate at which messages are produced.

To borrow a phrase from the Spring documentation:

If a publisher cannot slow down, it has to decide whether to buffer, drop, or fail.

3. Source tracking

Creating a Server

The SpringBoot Web module provides support for Web services. WebFlux uses Netty as its web server by default, so let’s focus on the implementation.

To recap, SpringBoot starts the service through the run method. In this method, the createApplicationContext method is called to create Spring’s ApplicationContext. Spring’s template method pattern provides multiple context implementations. Choose different implementations for different environments.

protected ConfigurableApplicationContext createApplicationContext(a) { Class<? > contextClass =this.applicationContextClass;
   if (contextClass == null) {
      try {
         switch (this.webApplicationType) {
         case SERVLET:
            contextClass = Class.forName(DEFAULT_WEB_CONTEXT_CLASS);
            break;
         case REACTIVE:
            contextClass = Class.forName(DEFAULT_REACTIVE_WEB_CONTEXT_CLASS);
            break;
         default: contextClass = Class.forName(DEFAULT_CONTEXT_CLASS); }}catch (ClassNotFoundException ex) {
         throw new IllegalStateException(
               "Unable create a default ApplicationContext, "
                     + "please specify an ApplicationContextClass", ex); }}return (ConfigurableApplicationContext) BeanUtils.instantiateClass(contextClass);
}
Copy the code

WebFlux adopt the REACTIVE mode, also is the context for AnnotationConfigReactiveWebServerApplicationContext initialization. After creation, prepare is called first. Call refreshContex to refresh the context. This process is a series of initialization, listener registration, and so on.

The focus is onRefresh’s onRefresh method.

OnRefresh method

@Override
protected void onRefresh(a) {
   super.onRefresh();
   try {
      createWebServer();
   }
   catch (Throwable ex) {
      throw new ApplicationContextException("Unable to start reactive web server", ex); }}private void createWebServer(a) {
   WebServer localServer = this.webServer;
   if (localServer == null) {
      this.webServer = getWebServerFactory().getWebServer(getHttpHandler());
   }
   initPropertySources();
}
Copy the code

This calls createWebServer to create the Web server. If you use netty, will call NettyReactiveWebServerFactory getWebServer method. Create a server using builder.

Finally, the TcpBridgeServer is created, and the TcpBridgeServer is connected to the TcpServer. Override the doHander method. The purpose of this method is to provide ChannelInitializer for Netty. For different application layer transport protocols, the processing logic must be inconsistent. The same goes for websocket. So based on TCP application layer protocol, we can refer to this method.

@Override
protected ContextHandler<Channel> doHandler(
      BiFunction<? super NettyInbound, ? super NettyOutbound, ? extends Publisher<Void>> handler,
      MonoSink<NettyContext> sink) {
   BiPredicate<HttpServerRequest, HttpServerResponse> compressPredicate =
         compressPredicate(options);
   boolean alwaysCompress = compressPredicate == null && options.minCompressionResponseSize() == 0;
   return ContextHandler.newServerContext(sink,
         options,
         loggingHandler,
         (ch, c, msg) -> {
            HttpServerOperations ops = HttpServerOperations.bindHttp(ch,
                  handler,
                  c,
                  compressPredicate,
                  msg);

            if (alwaysCompress) {
               ops.compression(true);
            }
            return ops;
         })
                        .onPipeline(this)
                        .autoCreateOperations(false);
}
Copy the code

Ok, so we’re just providing a factory, and if an HTTP request comes in, we’re going to construct an HttpServerOperations. More on that later. Ps: Functional programming is so disgusting, but it does allow you to pass in a method as an argument in place.

ContextHandler

It says that this is a ChannelInitializer, so we track its initChannel method.

@Override
protected void initChannel(CHANNEL ch) throws Exception { accept(ch); } to accpet:try {
   if(pipelineConfigurator ! =null) {
      pipelineConfigurator.accept(channel.pipeline(),
            (ContextHandler<Channel>) this);
   }
   channel.pipeline()
          .addLast(NettyPipeline.ReactiveBridge,
                new ChannelOperationsHandler(this));
}
Copy the code

He calls the corresponding Accept method. The above several lines, main logic is the core is that he will call pipelineConfigurator. Accept method. PipelineConfigurator is TcpBridgeServer. Passed in by the onPipeline method in doHandler.

The accept method of TcpBridgeServer registers the HTTP codec and HTTP request handler.

@Override
public void accept(ChannelPipeline p, ContextHandler<Channel> c) {
   p.addLast(NettyPipeline.HttpCodec, new HttpServerCodec(
           options.httpCodecMaxInitialLineLength(),
           options.httpCodecMaxHeaderSize(),
           options.httpCodecMaxChunkSize(),
           options.httpCodecValidateHeaders(),
           options.httpCodecInitialBufferSize()));

   if (ACCESS_LOG_ENABLED) {
      p.addLast(NettyPipeline.AccessLogHandler, new AccessLogHandler());
   }

   p.addLast(NettyPipeline.HttpServerHandler, new HttpServerHandler(c));
}
Copy the code

HttpServerHandler

When we see this, we know. All HTTP requests go through the channelRead method of HttpServerHandler.

if (persistentConnection) {
   pendingResponses += 1;
   if (HttpServerOperations.log.isDebugEnabled()) {
      HttpServerOperations.log.debug(format(ctx.channel(), "Increasing pending responses, now {}"),
            pendingResponses);
   }
   persistentConnection = isKeepAlive(request);
}
else {
   if (HttpServerOperations.log.isDebugEnabled()) {
      HttpServerOperations.log.debug(format(ctx.channel(), "Dropping pipelined HTTP request, " +
                  "previous response requested connection close"));
   }
   ReferenceCountUtil.release(msg);
   return;
}
if (pendingResponses > 1) {
   if (HttpServerOperations.log.isDebugEnabled()) {
      HttpServerOperations.log.debug(format(ctx.channel(), "Buffering pipelined HTTP request, " +
                  "pending response count: {}, queue: {}"), pendingResponses, pipelined ! =null ? pipelined.size() : 0);
   }
   overflow = true;
   doPipeline(ctx, msg);
   return;
}
else {
   overflow = false;
   parentContext.createOperations(ctx.channel(), msg);
   if(! (msginstanceof FullHttpRequest)) {
      return; }}Copy the code

This method is a bit longer and intercepts the logic used to handle httpRequest:

1. Check whether the message is successfully parsed. If the message fails to be parsed, an error response is returned.

2. Check whether the current connection is maintained. If the connection is disconnected, the message is discarded. Otherwise pendingResponses + 1, pendingResponses is the number of waiting responses

3. The judge pendingResponses

  • If the value is greater than 1, the current connection has one waiting response. Queue the request (doPipeline method), wait for subsequent processing, and set Overflow to true.
  • Equals 1, the request is processed. So a connection cannot handle two requests at the same time and must wait for the first one to respond.

Why are there pendingResponses? This is actually a Keepalive limit for HTTP1.1. Because I remember Tomcat doing the same thing. Must be a serial session. Http2.0 supports parallel sessions

DoPipeline method

void doPipeline(ChannelHandlerContext ctx, Object msg) {
   if (pipelined == null) {
      pipelined = Queues.unbounded()
                        .get();
   }
   if (!pipelined.offer(msg)) {
      ctx.fireExceptionCaught(Exceptions.failWithOverflow());
   }
}
Copy the code

The method is to queue the request, i.e. for a connection, if there is no response to the last request. Subsequent requests are queued here.

@Override
public void run(a) {
   Object next;
   boolean nextRequest = false;
   while((next = pipelined.peek()) ! =null) {
      if (next instanceof HttpRequest) {
         if(nextRequest || ! persistentConnection) {return;
         }
         nextRequest = true;
         parentContext.createOperations(ctx.channel(), next);
         if(! (nextinstanceof FullHttpRequest)) {
            pipelined.poll();
            continue;
         }
      }
      ctx.fireChannelRead(pipelined.poll());
   }
   overflow = false;
}
Copy the code

The run method actually processes the above queued request. With nextRequest control, only one is processed at a time. You might wonder where the run method is executed, but I’ve been looking for it for a long time. It was finally found in the write method of HttpServerHandler.

if(pipelined ! =null && !pipelined.isEmpty()) {
   if (HttpServerOperations.log.isDebugEnabled()) {
      HttpServerOperations.log.debug(format(ctx.channel(), "Draining next pipelined " +
                  "request, pending response count: {}, queued: {}"),
            pendingResponses, pipelined.size());
   }
   ctx.executor()
      .execute(this);
}
else {
   ctx.read();
}
Copy the code

Why execute in write? Because write means that the last request for this connection has been responded to. We can move on to the next request for this connection.

Ok, from the above analysis, we can basically know how Netty accepts and processes HTTP requests. Also, how to support serial sessions of Keepalive for HTTP1.1. So let’s look at the logic for handling HTTP requests in detail.

Handle the request

From the above analysis, we can basically see that after receiving the request, the following method is called.

parentContext.createOperations(ctx.channel(), msg);
Copy the code

ParentContext is the ContextHandler created on the TcpBridgeServer.

ContextHandler createOperations: channel.eventLoop().execute(op::onHandlerStart)

protected void onHandlerStart(a) {
   applyHandler();
}
Copy the code

Note here that it is executed using the current eventLoop. The op is essentially HttpServerOperations, and eventually calls the base class applyHandler method.

ApplyHandler method

    protected final void applyHandler(a) {
// channel.pipeline()
// .fireUserEventTriggered(NettyPipeline.handlerStartedEvent());
      if (log.isDebugEnabled()) {
         log.debug(format(channel(), "[{}] Handler is being applied: {}"), formatName(), handler);
      }
      try {
         Mono.fromDirect(handler.apply((INBOUND) this, (OUTBOUND) this))
             .subscribe(this);
      }
      catch (Throwable t) {
         log.error(format(channel(), ""), t); channel.close(); }}Copy the code

The core logic here is to call the handler.apply method. This handler corresponds to a class called. Is passed in to create a webServer.

ReactorHttpHandlerAdapter#apply

This method builds ReactorServerHttpRequest and ReactorServerHttpResponse. The handle method of the httpHandler method is then called to handle the logic. This is essentially a wrapper around the original request and response. This handler is provided through the HttpWebHandlerAdapter and ultimately calls the Handler method of the DispatcherHandler.

@Override
public Mono<Void> handle(ServerWebExchange exchange) {
   if (logger.isDebugEnabled()) {
      ServerHttpRequest request = exchange.getRequest();
      logger.debug("Processing " + request.getMethodValue() + " request for [" + request.getURI() + "]");
   }
   if (this.handlerMappings == null) {
      return Mono.error(HANDLER_NOT_FOUND_EXCEPTION);
   }
   return Flux.fromIterable(this.handlerMappings)
         .concatMap(mapping -> mapping.getHandler(exchange))
         .next()
         .switchIfEmpty(Mono.error(HANDLER_NOT_FOUND_EXCEPTION))
         .flatMap(handler -> invokeHandler(exchange, handler))
         .flatMap(result -> handleResult(exchange, result));
}
Copy the code

1. The Mapping. getHandler method will lookup the corresponding HandlerMethod based on the requested information.

Here, the business we write the handler initialization location: is initialized in AbstractHandlerMethodMapping initHandlerMethods method. All information is stored in MappingRegistry.

2. InvokeHandler executes the business logic.

3. The handleResult method processes the return result. The ResponseEntity and ResponseBody have different processing logic.

RequestMappingHandlerAdapter#handler

@Override
public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) {
   HandlerMethod handlerMethod = (HandlerMethod) handler;
   Assert.state(this.methodResolver ! =null && this.modelInitializer ! =null."Not initialized")
   InitBinderBindingContext bindingContext = new InitBinderBindingContext(
         getWebBindingInitializer(), this.methodResolver.getInitBinderMethods(handlerMethod));
   InvocableHandlerMethod invocableMethod = this.methodResolver.getRequestMappingMethod(handlerMethod);
   Function<Throwable, Mono<HandlerResult>> exceptionHandler =
         ex -> handleException(ex, handlerMethod, bindingContext, exchange);
   return this.modelInitializer
         .initModel(handlerMethod, bindingContext, exchange)
         .then(Mono.defer(() -> invocableMethod.invoke(exchange, bindingContext)))
         .doOnNext(result -> result.setExceptionHandler(exceptionHandler))
         .doOnNext(result -> bindingContext.saveModel())
         .onErrorResume(exceptionHandler);
}
Copy the code

This method is the method of handling business logic.

InitModel initializes session data. Store it in bindingContext.

The invocableMethod.invoke method is then called to execute the business logic.

An exception handler is then set for the result. If handling an exception executes exception logic, otherwise HandlerResult is returned directly for the upper level to handle.

InvocableHandlerMethod#invoke

public Mono<HandlerResult> invoke( ServerWebExchange exchange, BindingContext bindingContext, Object... providedArgs) {
   return resolveArguments(exchange, bindingContext, providedArgs).flatMap(args -> {
      try {
         Object value = doInvoke(args);
         HttpStatus status = getResponseStatus();
         if(status ! =null) {
            exchange.getResponse().setStatusCode(status);
         }
         MethodParameter returnType = getReturnType();
         ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(returnType.getParameterType());
         boolean asyncVoid = isAsyncVoidReturnType(returnType, adapter);
         if ((value == null || asyncVoid) && isResponseHandled(args, exchange)) {
            logger.debug("Response fully handled in controller method");
            return asyncVoid ? Mono.from(adapter.toPublisher(value)) : Mono.empty();
         }


         HandlerResult result = new HandlerResult(this, value, returnType, bindingContext);
         return Mono.just(result);
      }
      catch (InvocationTargetException ex) {
         return Mono.error(ex.getTargetException());
      }
      catch (Throwable ex) {
         return Mono.error(newIllegalStateException(getInvocationErrorMessage(args))); }}); }Copy the code

ResolveArguments: Request data deserialization method arguments

2. DoInvoke invokes a method through reflection.

3. The build result is returned

Once returned, Spring selects the appropriate handler to process the result through the configuration of our method. Normally we’re going to use ResponseBody. His processing logic is to serialize the results to JSON. Finally write back to the client.

The write operation is implemented in SUBSCRIBE in DeferredWriteMono. There are many layers of encapsulation. But you end up calling this method.

5. To summarize

In fact, the implementation of WebFlux is a reactive rewrite of the original implementation. The code is easy to read and understand. But debugging is really painful. The simplest way to think about it is a Stream of soha, looped around, and finally dropped to NioEventLoop for consumption. As for how to string it together. Well, it really depends on the reactive form.

Use WebFlux to make better use of the CPU. Reduce unnecessary thread blocking. In everyday development, if it’s computationally intensive. You can run directly on eventLoop. However, for network IO, you need to construct thread pools for asynchronous processing. If the corresponding service supports reactive asynchrony, so much the better.