>>>> 😜😜😜 Github: 👉 github.com/black-ant CASE backup: 👉 gitee.com/antblack/ca…

A. The preface

Documentation purposes

  • Comb through the details of forwarding requests in the Gateway production
  • Comb forwarding customization points

Knowledge supplement

Request forwarding is one of the core functions of the Gateway, which involves three main concepts:

Route: A Route is the basic unit of a gateway, consisting of an ID, a URI, a Predicate, and a set of filters. If the Predicate matches True, the Predicate (Predicate, Predicate) is forwarded: This is a Java 8 function predicate, input type is Spring Framework ServerWebExchange, currently SpringCloud Gateway supports a variety of methods, such as: Path, Query, Method, Header, etc., must be written in the form of Filter (Filter) of key= VLUE: Filter is the filtering logic passed when routing and forwarding requests. GatewayFilter instances constructed by specific factories can be used to modify the contents of requests and responses

2. Simple use

2.1 predicates summary

// 
- After=2017-01-20T17:42:47.789-07:00[America/Denver]

// 
- Before=2017-01-20T17:42:47.789-07:00[America/Denver]

//
- Between=2017-01-20T17:42:47.789-07:00[America/Denver], 2017-01-21T17:42:47.789-07:00[America/Denver]

// 
- Cookie=chocolate, ch.p

Copy the code

2.2 Mono and Flux

Mono and Flux are core objects that run through the process, and according to the Rea-Streams specification, the publisher provides a potentially infinite number of ordered elements and publishes them based on requirements received from its subscribers. Reactor-core has a set of implementations of this Publisher interface. The two important implementations of the sequence we will be creating are Mono and Flux.

  • Flux represents an asynchronous sequence of 0 to N elements
  • Mono represents an asynchronous sequence of 0 or 1 elements

> SpringGateway uses Webflux as the underlying invocation framework, involving mono and Flux objects

The sequence can contain three kinds of advice:

  • A normal message containing an element
  • Sequence ending message
  • Sequence error message

Flux

  • Flux is a standard Publisher, representing an asynchronous sequence of 0 to N emitters, selectively terminated by completion or error signals. As in the Reactive Streams specification, these three types of signals translate into calls to the downstream subscriber’s onNext, onComplete, or onError methods.

Mono

  • Mono is another implementation of Publisher. It emits at most one entry and then (optionally) terminates with an onComplete or onError signal. Mono is also asynchronous in nature
  • It provides only a subset of the operators available for Flux, and some operators (especially those that combine Mono with another publisher) switch to Flux.
    • For example, Mono#concatWith(Publisher) returns a Flux, and Mono#then(Mono) returns another Mono.

Common methods are as follows:

  • Create: Programmatically create Flux with multiple emission capabilities,
  • Empty: Emit 0 elements or return empty Flux < t >
  • Just: Create a foundation
  • Error: Creates a Flux that terminates with the specified error immediately after the subscription

PS: This piece will not look in depth, first look at the main process of Gateway

3. Intercept deep

The principle diagram of the 3.1

First take a look at the schematic of SpringGateway

4. The entry point of the call

4.1 Call Process

  • HttpWebHandlerAdapter # Handle: Build ServerWebExchange to initiate Handler processing
  • Step 2: DispatcherHandler # Handle: Initiates request processing
  • Step 3: RoutePredicateHandlerMapping # getHandlerInternal: the route judge processing

4.2. GetHandlerInternal logic

protectedMono<? > getHandlerInternal(ServerWebExchange exchange) {// don't handle requests on management port if set and different than server port
   if (this.managementPortType == DIFFERENT && this.managementPort ! =null
         && exchange.getRequest().getURI().getPort() == this.managementPort) {
      return Mono.empty();
   }
   exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());

   return lookupRoute(exchange)
         // .log("route-predicate-handler-mapping", Level.FINER) //name this.flatMap((Function<Route, Mono<? >>) r -> { exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);if (logger.isDebugEnabled()) {
               logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to " + r);
            }

            exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
            return Mono.just(webHandler);
         }).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
            exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
            if (logger.isTraceEnabled()) {
               logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]"); }}))); }Copy the code

3.2. lookupRoute

protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
   return this.routeLocator.getRoutes()
         // individually filter routes so that filterWhen error delaying is not a
         // problem
         .concatMap(route -> Mono.just(route).filterWhen(r -> {
            // add the current route we are testing
            exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
            return r.getPredicate().apply(exchange);
         })
               // instead of immediately stopping main flux due to error, log and
               // swallow it
               .doOnError(e -> logger.error("Error applying predicate for route: " + route.getId(), e))
               .onErrorResume(e -> Mono.empty()))
         // .defaultIfEmpty() put a static Route not found
         // or .switchIfEmpty()
         // .switchIfEmpty(Mono.<Route>empty().log("noroute"))
         .next()
         // TODO: error handling
         .map(route -> {
            validateRoute(route, exchange);
            return route;
         });


}
Copy the code

All routes are traversed

5. The process of sending

5.1 FilteringWebHandler system

WebHandler here is a FilteringWebHandler object. See what this object does

The following Filter is involved:

  • C- ForwardPathFilter :
  • C-forwardroutingfilter: Local forward filter
  • C-gatewaymetricsfilter: Integrated with Prometheus to create a Grafana dashboard
  • C-loadbalancerclientfilter: integrates the Ribbon to get the name of a microservice and then the actual call address through the Ribbon
  • C-nettyroutingfilter: HTTP or HTTPS, which uses Netty’s HttpClient to send proxy requests to downstream services
  • C-nettywriteresponsefilter: Used to write the proxy response back to the gateway’s client side, so this filter is executed after all other filters have been executed
  • C- OrderedGatewayFilter :
  • C-routetorequesturlfilter: converts the original URL obtained from the request into the URL used by the Gateway to forward the request
  • C- WebClientHttpRoutingFilter :
  • C- WebClientWriteResponseFilter :
  • C- WebsocketRoutingFilter: WS or WSS, then the Filter will use Spring Web Socket to forward Websocket requests downstream
  • C- WeightCalculatorWebFilter :

Please refer to ->Spring Cloud Gateway built-in global filters

Call logic 1: FilteringWebHandler management

The object has an inner class DefaultGatewayFilterChain, the class for the Filter Filter chain

private static class DefaultGatewayFilterChain implements GatewayFilterChain {
    
   // The current Filter chain leads
   private final int index;
   / / Filter set
   private final List<GatewayFilter> filters;

   DefaultGatewayFilterChain(List<GatewayFilter> filters) {
      this.filters = filters;
      this.index = 0;
   }

   private DefaultGatewayFilterChain(DefaultGatewayFilterChain parent, int index) {
      this.filters = parent.getFilters();
      this.index = index;
   }

   public List<GatewayFilter> getFilters(a) {
      return filters;
   }

   @Override
   public Mono<Void> filter(ServerWebExchange exchange) {
      return Mono.defer(() -> {
         if (this.index < filters.size()) {
            // Filter calls one by one
            GatewayFilter filter = filters.get(this.index);
            DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this.this.index + 1);
            return filter.filter(exchange, chain);
         }
         else {
            return Mono.empty(); // complete}}); }}Copy the code

Call process 3: Filter Filtering

public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {

    If the Filter does not meet the criteria, the Filter is skipped
   if (isAlreadyRouted(exchange)
         || (!"http".equals(scheme) && !"https".equals(scheme))) {
      return chain.filter(exchange);
   }
Copy the code

5.2 Sending Body

The core send Filter is a NettyRoutingFilter. Let’s focus on the logic of this Filter:

C- NettyRoutingFilter
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
   / / request URL: http://httpbin.org:80/get
   URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
    // Protocol type: HTTP
   String scheme = requestUrl.getScheme();
   
   // Step 1: Filter chain processing, if the HTTP protocol does not meet, through the next filter processing
   if (isAlreadyRouted(exchange)
         || (!"http".equals(scheme) && !"https".equals(scheme))) {
      return chain.filter(exchange);
   }
   // Step 2: Indicates that the processing is complete
   setAlreadyRouted(exchange);

   // Step 3: Obtain the Request object. This is the external Request object
   ServerHttpRequest request = exchange.getRequest();
    
   // Step 4: Get the Method type (get/post...)
   final HttpMethod method = HttpMethod.valueOf(request.getMethodValue());
   final String url = requestUrl.toString();
   
   // Step 5: Process the Header and forward it
   HttpHeaders filtered = filterRequest(getHeadersFilters(), exchange);
   final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
   filtered.forEach(httpHeaders::set);
    
   // -> Transfer-Encoding
   String transferEncoding = request.getHeaders().getFirst(HttpHeaders.TRANSFER_ENCODING);
   boolean chunkedTransfer = "chunked".equalsIgnoreCase(transferEncoding);
   // -> preserveHostHeader
   boolean preserveHost = exchange.getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false);

   // Make a forwarding request via Netty httpClient, PS!! This is asynchronous
   Flux<HttpClientResponse> responseFlux = this.httpClient
         .chunkedTransfer(chunkedTransfer).request(method).uri(url)
         .send((req, nettyOutbound) -> {
            // Step 6: Forward headers
            req.headers(httpHeaders);
            
            // => Whether to record the previous host
            if (preserveHost) {
               String host = request.getHeaders().getFirst(HttpHeaders.HOST);
               req.header(HttpHeaders.HOST, host);
            }
            
            // Step 7: Actually initiate the request
            return nettyOutbound.options(NettyPipeline.SendOptions::flushOnEach)
                  .send(request.getBody()
                        .map(dataBuffer -> ((NettyDataBuffer) dataBuffer)
                              .getNativeBuffer()));
         }).responseConnection((res, connection) -> {
            // Step 8: The request is complete, and the response is obtained
            ServerHttpResponse response = exchange.getResponse();
            
            // Step 9: Forward the headers and status attributes
            HttpHeaders headers = new HttpHeaders();
            res.responseHeaders().forEach(
                  entry -> headers.add(entry.getKey(), entry.getValue()));
            
            // => String CONTENT_TYPE = "Content-Type" 
            // => String ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR = "original_response_content_type";
            String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE);
            if (StringUtils.hasLength(contentTypeValue)) {
               exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR,
                     contentTypeValue);
            }
            
            // The forwarding state exists to set the state to GatewayResponse
            HttpStatus status = HttpStatus.resolve(res.status().code());
            if(status ! =null) {
               response.setStatusCode(status);
            }
            else if (response instanceof AbstractServerHttpResponse) {
               ((AbstractServerHttpResponse) response)
                     .setStatusCodeValue(res.status().code());
            }
            else {
               throw new IllegalStateException(
                     "Unable to set status code on response: "
                           + res.status().code() + ","
                           + response.getClass());
            }

            // Ensure that the Header filter is running after setting the status. Verify that the filter in the Header is normal
            HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(
                  getHeadersFilters(), headers, exchange, Type.RESPONSE);
            
            // String TRANSFER_ENCODING = "Transfer-Encoding"
            // String CONTENT_LENGTH = "Content-Length"
            if(! filteredResponseHeaders .containsKey(HttpHeaders.TRANSFER_ENCODING) && filteredResponseHeaders .containsKey(HttpHeaders.CONTENT_LENGTH)) {// Transfer-encoding needs to be removed when content-length exists
               response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING);
            }

            exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES,
                  filteredResponseHeaders.keySet());

            response.getHeaders().putAll(filteredResponseHeaders);

            // Delay submitting the response until all routing filters are running
            // Take the client response as the ServerWebExchange property and write the response NettyWriteResponseFilter later
            exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);
            exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection);

            return Mono.just(res);
         });

   if(properties.getResponseTimeout() ! =null) {
      // Handling the timeout exception
      responseFlux = responseFlux.timeout(properties.getResponseTimeout(),
            Mono.error(new TimeoutException("Response took longer than timeout: "
                  + properties.getResponseTimeout())))
            .onErrorMap(TimeoutException.class,
                  // GATEWAY_TIMEOUT(504, "Gateway Timeout")
                  th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT,
                        th.getMessage(), th));
   }

   return responseFlux.then(chain.filter(exchange));
}
Copy the code

5.3 return the Response

C- NettyWriteResponseFilter
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
   return chain.filter(exchange).then(Mono.defer(() -> {
      // Step 1: Obtain the GatewayRequest
      Connection connection = exchange.getAttribute(CLIENT_RESPONSE_CONN_ATTR);
      // Return null if no connection exists
      if (connection == null) {
         return Mono.empty();
      }
      
      // Step 2: Obtain the GatewayResponse
      ServerHttpResponse response = exchange.getResponse();
      NettyDataBufferFactory factory = (NettyDataBufferFactory) response
            .bufferFactory();

      // There is primarily a byteBufflux
      final Flux<NettyDataBuffer> body = connection.inbound().receive().retain()
            .map(factory::wrap);

      // Media type
      MediaType contentType = null;
      try {
         contentType = response.getHeaders().getContentType();
      }
      catch (Exception e) {
         log.trace("invalid media type", e);
      }
      return (isStreamingMediaType(contentType)
            ? response.writeAndFlushWith(body.map(Flux::just))
            : response.writeWith(body));
   }));
}
Copy the code

conclusion

Because the underlying understanding of Netty is not very clear, for some of the call process can not output data to see, this article is not very bottom of the mind, after further to supplement the details