>>>> 😜😜😜 Github: 👉 github.com/black-ant CASE backup: 👉 gitee.com/antblack/ca…
A. The preface
Documentation purposes
- Comb through the details of forwarding requests in the Gateway production
- Comb forwarding customization points
Knowledge supplement
Request forwarding is one of the core functions of the Gateway, which involves three main concepts:
Route: A Route is the basic unit of a gateway, consisting of an ID, a URI, a Predicate, and a set of filters. If the Predicate matches True, the Predicate (Predicate, Predicate) is forwarded: This is a Java 8 function predicate, input type is Spring Framework ServerWebExchange, currently SpringCloud Gateway supports a variety of methods, such as: Path, Query, Method, Header, etc., must be written in the form of Filter (Filter) of key= VLUE: Filter is the filtering logic passed when routing and forwarding requests. GatewayFilter instances constructed by specific factories can be used to modify the contents of requests and responses
2. Simple use
2.1 predicates summary
//
- After=2017-01-20T17:42:47.789-07:00[America/Denver]
//
- Before=2017-01-20T17:42:47.789-07:00[America/Denver]
//
- Between=2017-01-20T17:42:47.789-07:00[America/Denver], 2017-01-21T17:42:47.789-07:00[America/Denver]
//
- Cookie=chocolate, ch.p
Copy the code
2.2 Mono and Flux
Mono and Flux are core objects that run through the process, and according to the Rea-Streams specification, the publisher provides a potentially infinite number of ordered elements and publishes them based on requirements received from its subscribers. Reactor-core has a set of implementations of this Publisher interface. The two important implementations of the sequence we will be creating are Mono and Flux.
- Flux represents an asynchronous sequence of 0 to N elements
- Mono represents an asynchronous sequence of 0 or 1 elements
> SpringGateway uses Webflux as the underlying invocation framework, involving mono and Flux objects
The sequence can contain three kinds of advice:
- A normal message containing an element
- Sequence ending message
- Sequence error message
Flux
- Flux is a standard Publisher, representing an asynchronous sequence of 0 to N emitters, selectively terminated by completion or error signals. As in the Reactive Streams specification, these three types of signals translate into calls to the downstream subscriber’s onNext, onComplete, or onError methods.
Mono
- Mono is another implementation of Publisher. It emits at most one entry and then (optionally) terminates with an onComplete or onError signal. Mono is also asynchronous in nature
- It provides only a subset of the operators available for Flux, and some operators (especially those that combine Mono with another publisher) switch to Flux.
- For example, Mono#concatWith(Publisher) returns a Flux, and Mono#then(Mono) returns another Mono.
Common methods are as follows:
- Create: Programmatically create Flux with multiple emission capabilities,
- Empty: Emit 0 elements or return empty Flux < t >
- Just: Create a foundation
- Error: Creates a Flux that terminates with the specified error immediately after the subscription
PS: This piece will not look in depth, first look at the main process of Gateway
3. Intercept deep
The principle diagram of the 3.1
First take a look at the schematic of SpringGateway
4. The entry point of the call
4.1 Call Process
- HttpWebHandlerAdapter # Handle: Build ServerWebExchange to initiate Handler processing
- Step 2: DispatcherHandler # Handle: Initiates request processing
- Step 3: RoutePredicateHandlerMapping # getHandlerInternal: the route judge processing
4.2. GetHandlerInternal logic
protectedMono<? > getHandlerInternal(ServerWebExchange exchange) {// don't handle requests on management port if set and different than server port
if (this.managementPortType == DIFFERENT && this.managementPort ! =null
&& exchange.getRequest().getURI().getPort() == this.managementPort) {
return Mono.empty();
}
exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());
return lookupRoute(exchange)
// .log("route-predicate-handler-mapping", Level.FINER) //name this.flatMap((Function<Route, Mono<? >>) r -> { exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);if (logger.isDebugEnabled()) {
logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to " + r);
}
exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
return Mono.just(webHandler);
}).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
if (logger.isTraceEnabled()) {
logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]"); }}))); }Copy the code
3.2. lookupRoute
protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
return this.routeLocator.getRoutes()
// individually filter routes so that filterWhen error delaying is not a
// problem
.concatMap(route -> Mono.just(route).filterWhen(r -> {
// add the current route we are testing
exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
return r.getPredicate().apply(exchange);
})
// instead of immediately stopping main flux due to error, log and
// swallow it
.doOnError(e -> logger.error("Error applying predicate for route: " + route.getId(), e))
.onErrorResume(e -> Mono.empty()))
// .defaultIfEmpty() put a static Route not found
// or .switchIfEmpty()
// .switchIfEmpty(Mono.<Route>empty().log("noroute"))
.next()
// TODO: error handling
.map(route -> {
validateRoute(route, exchange);
return route;
});
}
Copy the code
All routes are traversed
5. The process of sending
5.1 FilteringWebHandler system
WebHandler here is a FilteringWebHandler object. See what this object does
The following Filter is involved:
- C- ForwardPathFilter :
- C-forwardroutingfilter: Local forward filter
- C-gatewaymetricsfilter: Integrated with Prometheus to create a Grafana dashboard
- C-loadbalancerclientfilter: integrates the Ribbon to get the name of a microservice and then the actual call address through the Ribbon
- C-nettyroutingfilter: HTTP or HTTPS, which uses Netty’s HttpClient to send proxy requests to downstream services
- C-nettywriteresponsefilter: Used to write the proxy response back to the gateway’s client side, so this filter is executed after all other filters have been executed
- C- OrderedGatewayFilter :
- C-routetorequesturlfilter: converts the original URL obtained from the request into the URL used by the Gateway to forward the request
- C- WebClientHttpRoutingFilter :
- C- WebClientWriteResponseFilter :
- C- WebsocketRoutingFilter: WS or WSS, then the Filter will use Spring Web Socket to forward Websocket requests downstream
- C- WeightCalculatorWebFilter :
Please refer to ->Spring Cloud Gateway built-in global filters
Call logic 1: FilteringWebHandler management
The object has an inner class DefaultGatewayFilterChain, the class for the Filter Filter chain
private static class DefaultGatewayFilterChain implements GatewayFilterChain {
// The current Filter chain leads
private final int index;
/ / Filter set
private final List<GatewayFilter> filters;
DefaultGatewayFilterChain(List<GatewayFilter> filters) {
this.filters = filters;
this.index = 0;
}
private DefaultGatewayFilterChain(DefaultGatewayFilterChain parent, int index) {
this.filters = parent.getFilters();
this.index = index;
}
public List<GatewayFilter> getFilters(a) {
return filters;
}
@Override
public Mono<Void> filter(ServerWebExchange exchange) {
return Mono.defer(() -> {
if (this.index < filters.size()) {
// Filter calls one by one
GatewayFilter filter = filters.get(this.index);
DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this.this.index + 1);
return filter.filter(exchange, chain);
}
else {
return Mono.empty(); // complete}}); }}Copy the code
Call process 3: Filter Filtering
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
If the Filter does not meet the criteria, the Filter is skipped
if (isAlreadyRouted(exchange)
|| (!"http".equals(scheme) && !"https".equals(scheme))) {
return chain.filter(exchange);
}
Copy the code
5.2 Sending Body
The core send Filter is a NettyRoutingFilter. Let’s focus on the logic of this Filter:
C- NettyRoutingFilter
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
/ / request URL: http://httpbin.org:80/get
URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
// Protocol type: HTTP
String scheme = requestUrl.getScheme();
// Step 1: Filter chain processing, if the HTTP protocol does not meet, through the next filter processing
if (isAlreadyRouted(exchange)
|| (!"http".equals(scheme) && !"https".equals(scheme))) {
return chain.filter(exchange);
}
// Step 2: Indicates that the processing is complete
setAlreadyRouted(exchange);
// Step 3: Obtain the Request object. This is the external Request object
ServerHttpRequest request = exchange.getRequest();
// Step 4: Get the Method type (get/post...)
final HttpMethod method = HttpMethod.valueOf(request.getMethodValue());
final String url = requestUrl.toString();
// Step 5: Process the Header and forward it
HttpHeaders filtered = filterRequest(getHeadersFilters(), exchange);
final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
filtered.forEach(httpHeaders::set);
// -> Transfer-Encoding
String transferEncoding = request.getHeaders().getFirst(HttpHeaders.TRANSFER_ENCODING);
boolean chunkedTransfer = "chunked".equalsIgnoreCase(transferEncoding);
// -> preserveHostHeader
boolean preserveHost = exchange.getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false);
// Make a forwarding request via Netty httpClient, PS!! This is asynchronous
Flux<HttpClientResponse> responseFlux = this.httpClient
.chunkedTransfer(chunkedTransfer).request(method).uri(url)
.send((req, nettyOutbound) -> {
// Step 6: Forward headers
req.headers(httpHeaders);
// => Whether to record the previous host
if (preserveHost) {
String host = request.getHeaders().getFirst(HttpHeaders.HOST);
req.header(HttpHeaders.HOST, host);
}
// Step 7: Actually initiate the request
return nettyOutbound.options(NettyPipeline.SendOptions::flushOnEach)
.send(request.getBody()
.map(dataBuffer -> ((NettyDataBuffer) dataBuffer)
.getNativeBuffer()));
}).responseConnection((res, connection) -> {
// Step 8: The request is complete, and the response is obtained
ServerHttpResponse response = exchange.getResponse();
// Step 9: Forward the headers and status attributes
HttpHeaders headers = new HttpHeaders();
res.responseHeaders().forEach(
entry -> headers.add(entry.getKey(), entry.getValue()));
// => String CONTENT_TYPE = "Content-Type"
// => String ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR = "original_response_content_type";
String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE);
if (StringUtils.hasLength(contentTypeValue)) {
exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR,
contentTypeValue);
}
// The forwarding state exists to set the state to GatewayResponse
HttpStatus status = HttpStatus.resolve(res.status().code());
if(status ! =null) {
response.setStatusCode(status);
}
else if (response instanceof AbstractServerHttpResponse) {
((AbstractServerHttpResponse) response)
.setStatusCodeValue(res.status().code());
}
else {
throw new IllegalStateException(
"Unable to set status code on response: "
+ res.status().code() + ","
+ response.getClass());
}
// Ensure that the Header filter is running after setting the status. Verify that the filter in the Header is normal
HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(
getHeadersFilters(), headers, exchange, Type.RESPONSE);
// String TRANSFER_ENCODING = "Transfer-Encoding"
// String CONTENT_LENGTH = "Content-Length"
if(! filteredResponseHeaders .containsKey(HttpHeaders.TRANSFER_ENCODING) && filteredResponseHeaders .containsKey(HttpHeaders.CONTENT_LENGTH)) {// Transfer-encoding needs to be removed when content-length exists
response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING);
}
exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES,
filteredResponseHeaders.keySet());
response.getHeaders().putAll(filteredResponseHeaders);
// Delay submitting the response until all routing filters are running
// Take the client response as the ServerWebExchange property and write the response NettyWriteResponseFilter later
exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);
exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection);
return Mono.just(res);
});
if(properties.getResponseTimeout() ! =null) {
// Handling the timeout exception
responseFlux = responseFlux.timeout(properties.getResponseTimeout(),
Mono.error(new TimeoutException("Response took longer than timeout: "
+ properties.getResponseTimeout())))
.onErrorMap(TimeoutException.class,
// GATEWAY_TIMEOUT(504, "Gateway Timeout")
th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT,
th.getMessage(), th));
}
return responseFlux.then(chain.filter(exchange));
}
Copy the code
5.3 return the Response
C- NettyWriteResponseFilter
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
return chain.filter(exchange).then(Mono.defer(() -> {
// Step 1: Obtain the GatewayRequest
Connection connection = exchange.getAttribute(CLIENT_RESPONSE_CONN_ATTR);
// Return null if no connection exists
if (connection == null) {
return Mono.empty();
}
// Step 2: Obtain the GatewayResponse
ServerHttpResponse response = exchange.getResponse();
NettyDataBufferFactory factory = (NettyDataBufferFactory) response
.bufferFactory();
// There is primarily a byteBufflux
final Flux<NettyDataBuffer> body = connection.inbound().receive().retain()
.map(factory::wrap);
// Media type
MediaType contentType = null;
try {
contentType = response.getHeaders().getContentType();
}
catch (Exception e) {
log.trace("invalid media type", e);
}
return (isStreamingMediaType(contentType)
? response.writeAndFlushWith(body.map(Flux::just))
: response.writeWith(body));
}));
}
Copy the code
conclusion
Because the underlying understanding of Netty is not very clear, for some of the call process can not output data to see, this article is not very bottom of the mind, after further to supplement the details