How does Spring Cloud Gateway work
The document is written well, as well as the source code to write a good source address: GitHub: github.com/spring-clou… Gitee: gitee.com/github_mirr…
In the flow chart of Spring Cloud Gateway, it can be seen that the Filter with low priority is processed last before the Request is sent, and priority is processed after the Response
Flow chart as above cover
The NettyRoutingFilter responsible for forwarding requests
For those familiar with the use of Spring Cloud Gateway, there is an interesting one that has the lowest priority
The priority is determined by getOrder(), and the higher the value, the lower the priority, and vice versa
The filter method does the following:
- Determines whether the request has already been sent and, if so, is not being processed
- Mark the request as sent and begin processing the request
- Process the request header
- Get Route information with request timeout set
- Send requests using HttpClient
- Send the request and get the response, reprocessing the response to
exchange
And finally getFlux<HttpClientResponse> responseFlux
- judge
responseFlux
Whether to timeout, if the timeout process - The end of the
public class NettyRoutingFilter implements GlobalFilter.Ordered {
private static final Log log = LogFactory.getLog(NettyRoutingFilter.class);
/** * Sets the lowest priority, in order to process the request last, the first processing response */
public static final int ORDER = Ordered.LOWEST_PRECEDENCE;
// HttpClient is responsible for forwarding requests
private final HttpClient httpClient;
private final ObjectProvider<List<HttpHeadersFilter>> headersFiltersProvider;
private final HttpClientProperties properties;
// do not use this headersFilters directly, use getHeadersFilters() instead.
private volatile List<HttpHeadersFilter> headersFilters;
public NettyRoutingFilter(HttpClient httpClient, ObjectProvider
> headersFiltersProvider, HttpClientProperties properties)
{
this.httpClient = httpClient;
this.headersFiltersProvider = headersFiltersProvider;
this.properties = properties;
}
public List<HttpHeadersFilter> getHeadersFilters(a) {
if (headersFilters == null) {
headersFilters = headersFiltersProvider.getIfAvailable();
}
return headersFilters;
}
@Override
public int getOrder(a) {
return ORDER;
}
@Override
@SuppressWarnings("Duplicates")
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
String scheme = requestUrl.getScheme();
if (isAlreadyRouted(exchange) || (!"http".equals(scheme) && !"https".equals(scheme))) {
return chain.filter(exchange);
}
setAlreadyRouted(exchange);
ServerHttpRequest request = exchange.getRequest();
final HttpMethod method = HttpMethod.valueOf(request.getMethodValue());
final String url = requestUrl.toASCIIString();
HttpHeaders filtered = filterRequest(getHeadersFilters(), exchange);
final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
filtered.forEach(httpHeaders::set);
boolean preserveHost = exchange.getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false);
Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
Flux<HttpClientResponse> responseFlux = getHttpClient(route, exchange).headers(headers -> {
headers.add(httpHeaders);
// Will either be set below, or later by Netty
headers.remove(HttpHeaders.HOST);
if (preserveHost) {
String host = request.getHeaders().getFirst(HttpHeaders.HOST);
headers.add(HttpHeaders.HOST, host);
}
}).request(method).uri(url).send((req, nettyOutbound) -> {
if (log.isTraceEnabled()) {
nettyOutbound.withConnection(connection -> log.trace("outbound route: "
+ connection.channel().id().asShortText() + ", inbound: " + exchange.getLogPrefix()));
}
return nettyOutbound.send(request.getBody().map(this::getByteBuf));
}).responseConnection((res, connection) -> {
// Defer committing the response until all route filters have run
// Put client response as ServerWebExchange attribute and write
// response later NettyWriteResponseFilter
exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);
exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection);
ServerHttpResponse response = exchange.getResponse();
// put headers and status so filters can modify the response
HttpHeaders headers = new HttpHeaders();
res.responseHeaders().forEach(entry -> headers.add(entry.getKey(), entry.getValue()));
String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE);
if (StringUtils.hasLength(contentTypeValue)) {
exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR, contentTypeValue);
}
setResponseStatus(res, response);
// make sure headers filters run after setting status so it is
// available in response
HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(getHeadersFilters(), headers, exchange,
Type.RESPONSE);
if(! filteredResponseHeaders.containsKey(HttpHeaders.TRANSFER_ENCODING) && filteredResponseHeaders.containsKey(HttpHeaders.CONTENT_LENGTH)) {// It is not valid to have both the transfer-encoding header and
// the content-length header.
// Remove the transfer-encoding header in the response if the
// content-length header is present.
response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING);
}
exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES, filteredResponseHeaders.keySet());
response.getHeaders().putAll(filteredResponseHeaders);
return Mono.just(res);
});
Duration responseTimeout = getResponseTimeout(route);
if(responseTimeout ! =null) {
responseFlux = responseFlux
.timeout(responseTimeout,
Mono.error(new TimeoutException("Response took longer than timeout: " + responseTimeout)))
.onErrorMap(TimeoutException.class,
th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th));
}
return responseFlux.then(chain.filter(exchange));
}
protected ByteBuf getByteBuf(DataBuffer dataBuffer) {
if (dataBuffer instanceof NettyDataBuffer) {
NettyDataBuffer buffer = (NettyDataBuffer) dataBuffer;
return buffer.getNativeBuffer();
}
// MockServerHttpResponse creates these
else if (dataBuffer instanceof DefaultDataBuffer) {
DefaultDataBuffer buffer = (DefaultDataBuffer) dataBuffer;
return Unpooled.wrappedBuffer(buffer.getNativeBuffer());
}
throw new IllegalArgumentException("Unable to handle DataBuffer of type " + dataBuffer.getClass());
}
private void setResponseStatus(HttpClientResponse clientResponse, ServerHttpResponse response) {
HttpStatus status = HttpStatus.resolve(clientResponse.status().code());
if(status ! =null) {
response.setStatusCode(status);
}
else {
while (response instanceof ServerHttpResponseDecorator) {
response = ((ServerHttpResponseDecorator) response).getDelegate();
}
if (response instanceof AbstractServerHttpResponse) {
((AbstractServerHttpResponse) response).setRawStatusCode(clientResponse.status().code());
}
else {
// TODO: log warning here, not throw error?
throw new IllegalStateException("Unable to set status code " + clientResponse.status().code()
+ " on response of type "+ response.getClass().getName()); }}}/**
* Creates a new HttpClient with per route timeout configuration. Sub-classes that
* override, should call super.getHttpClient() if they want to honor the per route
* timeout configuration.
* @param route the current route.
* @param exchange the current ServerWebExchange.
* @return* /
protected HttpClient getHttpClient(Route route, ServerWebExchange exchange) {
Object connectTimeoutAttr = route.getMetadata().get(CONNECT_TIMEOUT_ATTR);
if(connectTimeoutAttr ! =null) {
Integer connectTimeout = getInteger(connectTimeoutAttr);
return this.httpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout);
}
return httpClient;
}
static Integer getInteger(Object connectTimeoutAttr) {
Integer connectTimeout;
if (connectTimeoutAttr instanceof Integer) {
connectTimeout = (Integer) connectTimeoutAttr;
}
else {
connectTimeout = Integer.parseInt(connectTimeoutAttr.toString());
}
return connectTimeout;
}
private Duration getResponseTimeout(Route route) {
Object responseTimeoutAttr = route.getMetadata().get(RESPONSE_TIMEOUT_ATTR);
Long responseTimeout = null;
if(responseTimeoutAttr ! =null) {
if (responseTimeoutAttr instanceof Number) {
responseTimeout = ((Number) responseTimeoutAttr).longValue();
}
else{ responseTimeout = Long.valueOf(responseTimeoutAttr.toString()); }}returnresponseTimeout ! =null? Duration.ofMillis(responseTimeout) : properties.getResponseTimeout(); }}Copy the code
The NettyWriteResponseFilter that writes the response back to the original connection
NettyRoutingFilter is the last to process the request, so NettyWriteResponseFilter should be the last to process the response, with an Order of -1
Keep these two priorities in mind when configuring GlobalFilter and GatewayFilter yourself
If the Request is to be processed before it is forwarded to the target server, the Order must be set to a higher priority than NettyRoutingFilter, i.e. 2147483647
If you want to process Response before the Response is overwritten, the priority set to Response must be lower than NettyWriteResponseFilter, that is, Order is greater than -1
public class NettyWriteResponseFilter implements GlobalFilter.Ordered {
/** * Order for write response filter. */
public static final int WRITE_RESPONSE_FILTER_ORDER = -1;
private static final Log log = LogFactory.getLog(NettyWriteResponseFilter.class);
private final List<MediaType> streamingMediaTypes;
public NettyWriteResponseFilter(List<MediaType> streamingMediaTypes) {
this.streamingMediaTypes = streamingMediaTypes;
}
@Override
public int getOrder(a) {
return WRITE_RESPONSE_FILTER_ORDER;
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// NOTICE: nothing in "pre" filter stage as CLIENT_RESPONSE_CONN_ATTR is not added
// until the NettyRoutingFilter is run
// @formatter:off
return chain.filter(exchange)
.doOnError(throwable -> cleanup(exchange))
.then(Mono.defer(() -> {
Connection connection = exchange.getAttribute(CLIENT_RESPONSE_CONN_ATTR);
if (connection == null) {
return Mono.empty();
}
if (log.isTraceEnabled()) {
log.trace("NettyWriteResponseFilter start inbound: "
+ connection.channel().id().asShortText() + ", outbound: "
+ exchange.getLogPrefix());
}
ServerHttpResponse response = exchange.getResponse();
// TODO: needed?
final Flux<DataBuffer> body = connection
.inbound()
.receive()
.retain()
.map(byteBuf -> wrap(byteBuf, response));
MediaType contentType = null;
try {
contentType = response.getHeaders().getContentType();
}
catch (Exception e) {
if (log.isTraceEnabled()) {
log.trace("invalid media type", e); }}return (isStreamingMediaType(contentType)
? response.writeAndFlushWith(body.map(Flux::just))
: response.writeWith(body));
})).doOnCancel(() -> cleanup(exchange));
// @formatter:on
}
protected DataBuffer wrap(ByteBuf byteBuf, ServerHttpResponse response) {
DataBufferFactory bufferFactory = response.bufferFactory();
if (bufferFactory instanceof NettyDataBufferFactory) {
NettyDataBufferFactory factory = (NettyDataBufferFactory) bufferFactory;
return factory.wrap(byteBuf);
}
// MockServerHttpResponse creates these
else if (bufferFactory instanceof DefaultDataBufferFactory) {
DataBuffer buffer = ((DefaultDataBufferFactory) bufferFactory).allocateBuffer(byteBuf.readableBytes());
buffer.write(byteBuf.nioBuffer());
byteBuf.release();
return buffer;
}
throw new IllegalArgumentException("Unkown DataBufferFactory type " + bufferFactory.getClass());
}
private void cleanup(ServerWebExchange exchange) {
Connection connection = exchange.getAttribute(CLIENT_RESPONSE_CONN_ATTR);
if(connection ! =null && connection.channel().isActive() && !connection.isPersistent()) {
connection.dispose();
}
}
// TODO: use framework if possible
// TODO: port to WebClientWriteResponseFilter
private boolean isStreamingMediaType(@Nullable MediaType contentType) {
return(contentType ! =null && this.streamingMediaTypes.stream().anyMatch(contentType::isCompatibleWith)); }}Copy the code
How to achieve load balancing
Load balancing filter for ReactiveLoadBalancerClientFilter the main function of the filter
- Lb is required to perform load balancing in the Routes configuration of the Spring Cloud Gateway
- Locate the serviceId based on the LB information, for example
lb://user-server
ServiceId asuser-server
- The LB implementation relies on Consul, Eureka, and ZK to find an available instance from LoadBalancer based on serviceId
- If no instance is available, determine whether to use the 404 prompt, if so use the 404 status code, and if not use 502
- Obtain the host information of the target server from serviceInstance
- Set the obtained host information to Attributes for convenience in the
NettyRoutingFilter
This address is obtained when the request is forwarded and forwarded to the corresponding serverexchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl)
public class ReactiveLoadBalancerClientFilter implements GlobalFilter.Ordered {
private static final Log log = LogFactory.getLog(ReactiveLoadBalancerClientFilter.class);
/** * Order of filter. */
public static final int LOAD_BALANCER_CLIENT_FILTER_ORDER = 10150;
private final LoadBalancerClientFactory clientFactory;
private final GatewayLoadBalancerProperties properties;
private final LoadBalancerProperties loadBalancerProperties;
public ReactiveLoadBalancerClientFilter(LoadBalancerClientFactory clientFactory, GatewayLoadBalancerProperties properties, LoadBalancerProperties loadBalancerProperties) {
this.clientFactory = clientFactory;
this.properties = properties;
this.loadBalancerProperties = loadBalancerProperties;
}
@Override
public int getOrder(a) {
return LOAD_BALANCER_CLIENT_FILTER_ORDER;
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);
if (url == null| | (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {
return chain.filter(exchange);
}
// preserve the original url
addOriginalRequestUrl(exchange, url);
if (log.isTraceEnabled()) {
log.trace(ReactiveLoadBalancerClientFilter.class.getSimpleName() + " url before: " + url);
}
URI requestUri = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
String serviceId = requestUri.getHost();
Set<LoadBalancerLifecycle> supportedLifecycleProcessors = LoadBalancerLifecycleValidator
.getSupportedLifecycleProcessors(clientFactory.getInstances(serviceId, LoadBalancerLifecycle.class),
RequestDataContext.class, ResponseData.class, ServiceInstance.class);
DefaultRequest<RequestDataContext> lbRequest = new DefaultRequest<>(new RequestDataContext(
new RequestData(exchange.getRequest()), getHint(serviceId, loadBalancerProperties.getHint())));
return choose(lbRequest, serviceId, supportedLifecycleProcessors).doOnNext(response -> {
if(! response.hasServer()) { supportedLifecycleProcessors.forEach(lifecycle -> lifecycle .onComplete(new CompletionContext<>(CompletionContext.Status.DISCARD, lbRequest, response)));
throw NotFoundException.create(properties.isUse404(), "Unable to find instance for " + url.getHost());
}
ServiceInstance retrievedInstance = response.getServer();
URI uri = exchange.getRequest().getURI();
// if the `lb:<scheme>` mechanism was used, use `<scheme>` as the default,
// if the loadbalancer doesn't provide one.
String overrideScheme = retrievedInstance.isSecure() ? "https" : "http";
if(schemePrefix ! =null) {
overrideScheme = url.getScheme();
}
DelegatingServiceInstance serviceInstance = new DelegatingServiceInstance(retrievedInstance,
overrideScheme);
URI requestUrl = reconstructURI(serviceInstance, uri);
if (log.isTraceEnabled()) {
log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
}
exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
exchange.getAttributes().put(GATEWAY_LOADBALANCER_RESPONSE_ATTR, response);
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStartRequest(lbRequest, response));
}).then(chain.filter(exchange))
.doOnError(throwable -> supportedLifecycleProcessors.forEach(lifecycle -> lifecycle
.onComplete(new CompletionContext<ResponseData, ServiceInstance, RequestDataContext>(
CompletionContext.Status.FAILED, throwable, lbRequest,
exchange.getAttribute(GATEWAY_LOADBALANCER_RESPONSE_ATTR)))))
.doOnSuccess(aVoid -> supportedLifecycleProcessors.forEach(lifecycle -> lifecycle
.onComplete(new CompletionContext<ResponseData, ServiceInstance, RequestDataContext>(
CompletionContext.Status.SUCCESS, lbRequest,
exchange.getAttribute(GATEWAY_LOADBALANCER_RESPONSE_ATTR),
new ResponseData(exchange.getResponse(), new RequestData(exchange.getRequest()))))));
}
protected URI reconstructURI(ServiceInstance serviceInstance, URI original) {
return LoadBalancerUriTools.reconstructURI(serviceInstance, original);
}
private Mono<Response<ServiceInstance>> choose(Request<RequestDataContext> lbRequest, String serviceId,
Set<LoadBalancerLifecycle> supportedLifecycleProcessors) {
ReactorLoadBalancer<ServiceInstance> loadBalancer = this.clientFactory.getInstance(serviceId,
ReactorServiceInstanceLoadBalancer.class);
if (loadBalancer == null) {
throw new NotFoundException("No loadbalancer available for " + serviceId);
}
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest));
return loadBalancer.choose(lbRequest);
}
private String getHint(String serviceId, Map<String, String> hints) {
String defaultHint = hints.getOrDefault("default"."default");
String hintPropertyValue = hints.get(serviceId);
returnhintPropertyValue ! =null? hintPropertyValue : defaultHint; }}Copy the code
conclusion
In this way, the Spring Cloud Gateway can complete the forwarding of requests to the target server and rewrite the response of the target server to the initiator through these two filters. In order to facilitate developers to expand its functions, we can implement some filters ourselves to realize some custom processing of requests before they are forwarded to the target server. For example: modify the request header, modify the request parameters, rewrite the request path, rewrite the target service; The response is processed before it is overwritten back to the initiator, such as adding cross-domain request headers and encrypting the response content. Of course, we can also handle both requests and responses, such as counting request times and so on
Once again remind
Keep these two priorities in mind when you configure GlobalFilter and GatewayFilter if you want to process a Request before it gets forwarded to the target server it must be set to a higher priority than NettyRoutingFilter, Order is smaller than 2147483647. If you want to process Response before the Response is overwritten, the priority set to Response must be lower than NettyWriteResponseFilter, that is, Order is larger than -1
reference
Making: github.com/spring-clou… Gitee: gitee.com/github_mirr…
Further reading
In addition to the above three filters, the org. Springframework. Cloud. Gateway. There are a lot of interesting filter under the filter bag
GatewayMetricsFilter Indicates the request time
public class GatewayMetricsFilter implements GlobalFilter.Ordered {
private static final Log log = LogFactory.getLog(GatewayMetricsFilter.class);
private final MeterRegistry meterRegistry;
private GatewayTagsProvider compositeTagsProvider;
private final String metricsPrefix;
public GatewayMetricsFilter(MeterRegistry meterRegistry, List
tagsProviders, String metricsPrefix)
{
this.meterRegistry = meterRegistry;
this.compositeTagsProvider = tagsProviders.stream().reduce(exchange -> Tags.empty(), GatewayTagsProvider::and);
if (metricsPrefix.endsWith(".")) {
this.metricsPrefix = metricsPrefix.substring(0, metricsPrefix.length() - 1);
}
else {
this.metricsPrefix = metricsPrefix; }}public String getMetricsPrefix(a) {
return metricsPrefix;
}
@Override
public int getOrder(a) {
// start the timer as soon as possible and report the metric event before we write
// response to client
return NettyWriteResponseFilter.WRITE_RESPONSE_FILTER_ORDER + 1;
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
Sample sample = Timer.start(meterRegistry);
return chain.filter(exchange).doOnSuccess(aVoid -> endTimerRespectingCommit(exchange, sample))
.doOnError(throwable -> endTimerRespectingCommit(exchange, sample));
}
private void endTimerRespectingCommit(ServerWebExchange exchange, Sample sample) {
ServerHttpResponse response = exchange.getResponse();
if (response.isCommitted()) {
endTimerInner(exchange, sample);
}
else {
response.beforeCommit(() -> {
endTimerInner(exchange, sample);
returnMono.empty(); }); }}private void endTimerInner(ServerWebExchange exchange, Sample sample) {
Tags tags = compositeTagsProvider.apply(exchange);
if (log.isTraceEnabled()) {
log.trace(metricsPrefix + ".requests tags: " + tags);
}
sample.stop(meterRegistry.timer(metricsPrefix + ".requests", tags)); }}Copy the code
thanks
Thanks to the keyboard sent by my colleague, I finally couldn’t hear the clicking sound when I typed code in the middle of the night