Introduction to the
Based on the previous chapter: Soul gateway source code parsing (two) code preliminary run configuration, this debug request processing rough path, verify the gateway model path
An overview of
Based on the example, make a breakpoint in the pointcut and then trace the call stack to see where a request was received to send it; Went through those classes on the way; That’s roughly what we did
Most importantly, we will focus this time on seeing if the request is sent to the backend server
And how do I get the response back to the client
Source code for the Debug
View the run logs to find the entry point
In the previous part, we configured the Divide plug-in to forward http://localhost:9195 to the background server http://localhost:8082. First, run it without interrupting the point. Check the run log to find a starting point:
o.d.soul.plugin.base.AbstractSoulPlugin : divide selector success match , selector name :neety8082
o.d.soul.plugin.base.AbstractSoulPlugin : divide selector success match , selector name :neety8082
o.d.s.plugin.httpclient.WebClientPlugin : The request urlPath is http://localhost:8082, retryTimes is 1
Copy the code
An obvious divide related log in the above log is printed by AbstractSoulPlugin. Double-click Shift in Win, search AbstractSoulPlugin, and find it is an interface. The downward arrow on the left side of IDEA looks at its implementation and finds a familiar DividePlugin implementation class. Click on it, break on an obvious doExecute function, and initiate a request: http://localhost:9195
Through the function call stack, the call is sent by the SoulWebHandler, and you can see from the following functions that this is a loop through which the plugins are iterated for action
public Mono<Void> execute(final ServerWebExchange exchange) {
return Mono.defer(() -> {
if (this.index < plugins.size()) {
SoulPlugin plugin = plugins.get(this.index++);
Boolean skip = plugin.skip(exchange);
if (skip) {
return this.execute(exchange);
}
return plugin.execute(exchange, this);
}
return Mono.empty();
});
}
Copy the code
Trace call stack
If I go back to the call stack again, I’m sending SoulWebHandler and I’m calling the Execute of SoulWebHandler
public Mono<Void> handle(@NonNull final ServerWebExchange exchange) {
MetricsTrackerFacade.getInstance().counterInc(MetricsLabelEnum.REQUEST_TOTAL.getName());
Optional<HistogramMetricsTrackerDelegate> startTimer = MetricsTrackerFacade.getInstance().histogramStartTimer(MetricsLabelEnum.REQUEST_LATENCY.getName());
// new DefaultSoulPluginChain(plugins).execute(exchange)
return new DefaultSoulPluginChain(plugins).execute(exchange).subscribeOn(scheduler)
.doOnSuccess(t -> startTimer.ifPresent(time -> MetricsTrackerFacade.getInstance().histogramObserveDuration(time)));
}
Copy the code
After that, we found that we could not understand it. Instead of using the obvious function transfer relationship, we put port on the above function, restart the program, and send the request again
When the breakpoint comes in, look at the call stack and send DefaultWebFilterChain calling the above function
public Mono<Void> filter(ServerWebExchange exchange) {
return Mono.defer(() -> {
// called when both below are null
return this.currentFilter ! =null && this.chain ! =null ? this.invokeFilter(this.currentFilter, this.chain, exchange) : this.handler.handle(exchange);
});
}
Copy the code
Call stack again, again in the above function to hit a breakpoint, restart, send a request, the next directly write class and related functions, there is a special place to add a little note
Come to FilteringWebHandler
public Mono<Void> handle(ServerWebExchange exchange) {
return this.chain.filter(exchange);
}
Copy the code
Continue to the WebHandlerDecorator
public Mono<Void> handle(ServerWebExchange exchange) {
return this.delegate.handle(exchange);
}
Copy the code
Come to ExceptionHandlingWebHandler
public Mono<Void> handle(ServerWebExchange exchange) {
Mono completion;
try {
// Call from here
completion = super.handle(exchange);
} catch (Throwable var5) {
completion = Mono.error(var5);
}
WebExceptionHandler handler;
for(Iterator var3 = this.exceptionHandlers.iterator(); var3.hasNext(); completion = completion.onErrorResume((ex) -> {
return handler.handle(exchange, ex);
})) {
handler = (WebExceptionHandler)var3.next();
}
return completion;
}
Copy the code
Moving on to the HttpWebHandlerAdapter, this class is a bit more critical. You see the variable that you’ve been passing around before: Exchange. Exchange is generated in this class and passed to later functions to be called, and is generated using Response and Request
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
if (this.forwardedHeaderTransformer ! =null) {
request = this.forwardedHeaderTransformer.apply(request);
}
// The key variable exchange is generated
ServerWebExchange exchange = this.createExchange(request, response);
LogFormatUtils.traceDebug(logger, (traceOn) -> {
return exchange.getLogPrefix() + this.formatRequest(exchange.getRequest()) + (traceOn ? ", headers=" + this.formatHeaders(exchange.getRequest().getHeaders()) : "");
});
// this.getDelegate().handle(exchange)
/ / is obtained from the debug can see getDelete ExceptionHandlingWebHandler, it is in this call
Mono var10000 = this.getDelegate().handle(exchange).doOnSuccess((aVoid) -> {
this.logResponse(exchange);
}).onErrorResume((ex) -> {
return this.handleUnresolvedError(exchange, ex);
});
response.getClass();
return var10000.then(Mono.defer(response::setComplete));
}
Copy the code
Continue to go to the ReactiveWebServerApplicationContext
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
return this.handler.handle(request, response);
}
Copy the code
Continue to go to the ReactorHttpHandlerAdapter
public Mono<Void> apply(HttpServerRequest reactorRequest, HttpServerResponse reactorResponse) {
NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(reactorResponse.alloc());
try {
// Exchange requires request and response generation
ReactorServerHttpRequest request = new ReactorServerHttpRequest(reactorRequest, bufferFactory);
ServerHttpResponse response = new ReactorServerHttpResponse(reactorResponse, bufferFactory);
if (request.getMethod() == HttpMethod.HEAD) {
response = new HttpHeadResponseDecorator((ServerHttpResponse)response);
}
return this.httpHandler.handle(request, (ServerHttpResponse)response).doOnError((ex) -> {
logger.trace(request.getLogPrefix() + "Failed to complete: " + ex.getMessage());
}).doOnSuccess((aVoid) -> {
logger.trace(request.getLogPrefix() + "Handling completed");
});
} catch (URISyntaxException var6) {
if (logger.isDebugEnabled()) {
logger.debug("Failed to get request URI: " + var6.getMessage());
}
reactorResponse.status(HttpResponseStatus.BAD_REQUEST);
returnMono.empty(); }}Copy the code
Continue to the HttpServerHandle
public void onStateChange(Connection connection, State newState) {
if (newState == HttpServerState.REQUEST_RECEIVED) {
try {
if (log.isDebugEnabled()) {
log.debug(ReactorNetty.format(connection.channel(), "Handler is being applied: {}"), new Object[]{this.handler});
}
HttpServerOperations ops = (HttpServerOperations)connection;
Mono.fromDirect((Publisher)this.handler.apply(ops, ops)).subscribe(ops.disposeSubscriber());
} catch (Throwable var4) {
log.error(ReactorNetty.format(connection.channel(), ""), var4); connection.channel().close(); }}}Copy the code
Continue to TcpServerBind
public void onStateChange(Connection connection, State newState) {
if(newState == State.DISCONNECTING && connection.channel().isActive() && ! connection.isPersistent()) { connection.dispose(); }this.childObs.onStateChange(connection, newState);
}
Copy the code
HttpServerOperations (CTX, MSG); HttpServerOperations (CTX, MSG); HttpServerOperations (CTX, MSG)
protected void onInboundNext(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof HttpRequest) {
try {
/ / call
this.listener().onStateChange(this, HttpServerState.REQUEST_RECEIVED);
} catch (Exception var4) {
this.onInboundError(var4);
ReferenceCountUtil.release(msg);
return;
}
if (msg instanceof FullHttpRequest) {
super.onInboundNext(ctx, msg);
if (this.isHttp2()) {
this.onInboundComplete(); }}}else {
if (msg instanceof HttpContent) {
if(msg ! = LastHttpContent.EMPTY_LAST_CONTENT) {super.onInboundNext(ctx, msg);
}
if (msg instanceof LastHttpContent) {
this.onInboundComplete(); }}else {
super.onInboundNext(ctx, msg); }}}Copy the code
Divide we’ve got an accept request and divide we’ve got an accept request and divide we’ve got an accept request and divide
- HttpServerOperations: The obvious place where Netty’s requests are received, the request entry
- TcpServerBind
- HttpServerHandle
- ReactorHttpHandlerAdapter: to generate the response and request
- ReactiveWebServerApplicationContext
- HttpWebHandlerAdapter: Exchange generation
- ExceptionHandlingWebHandler
- WebHandlerDecorator
- FilteringWebHandler
- DefaultWebFilterChain
- SoulWebHandler: Plugins call chain
- DividePlugin: Plugin handling
Debug details step by step
At this time, referring to the gateway model, I found that the route matching and other things were not seen, so I couldn’t help it. The details were not clear: SoulWebHandler, whose plugin is not called carefully, so we debug the following function, enter each function call (enter subscribe and other breakpoints, click to enter the next breakpoint, the arrow in the upper left corner of IDEA debug)
public Mono<Void> execute(final ServerWebExchange exchange) {
return Mono.defer(() -> {
if (this.index < plugins.size()) {
SoulPlugin plugin = plugins.get(this.index++);
Boolean skip = plugin.skip(exchange);
if (skip) {
return this.execute(exchange);
}
return plugin.execute(exchange, this);
}
return Mono.empty();
});
}
Copy the code
Step by step we debug the above function and look at the plugins, which are roughly as follows, followed by false and true as skip. The skip of each plug-in is as follows:
- GlobalPlugin : false
- SignPlugin : false
- WafPlugin: false
- RateLimiterPlugin : false
- HystrixPlugin : false
- Resilience4JPlugin : false
- DividePlugin : false
- WebClientPlugin : false
- WebsocketPlugin : true
- BodyParamPlugin : false
- AlibabaDubblePlugin : true
- MonitorPlugin : false
- WebClientResponsePlugin : false
- DubboResponsePlugin : true
Debugging time to follow in, in the later step by step can go
After debugging, we found that the previous several plugins were not implemented in the following code if, but in divide Plugin, we followed it and saw rules and match, which were suspected to be related to route matching
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
String pluginName = named();
final PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);
if(pluginData ! =null && pluginData.getEnabled()) {
final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);
if (CollectionUtils.isEmpty(selectors)) {
return handleSelectorIsNull(pluginName, exchange, chain);
}
final SelectorData selectorData = matchSelector(exchange, selectors);
if (Objects.isNull(selectorData)) {
return handleSelectorIsNull(pluginName, exchange, chain);
}
selectorLog(selectorData, pluginName);
final List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId());
if (CollectionUtils.isEmpty(rules)) {
return handleRuleIsNull(pluginName, exchange, chain);
}
RuleData rule;
if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) {
//get last
rule = rules.get(rules.size() - 1);
} else {
// Divide Plugin executes this step, in rules, we found the rule we configured, guess here is a route match
rule = matchRule(exchange, rules);
}
if (Objects.isNull(rule)) {
return handleRuleIsNull(pluginName, exchange, chain);
}
ruleLog(rule, pluginName);
return doExecute(exchange, chain, selectorData, rule);
}
return chain.execute(exchange);
}
Copy the code
Continue debugging, enter: WebClientPlugin, see suspected send request to background server, related code is as follows:
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
assertsoulContext ! =null;
String urlPath = exchange.getAttribute(Constants.HTTP_URL);
if (StringUtils.isEmpty(urlPath)) {
Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
}
long timeout = (long) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_TIME_OUT)).orElse(3000L);
int retryTimes = (int) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_RETRY)).orElse(0);
log.info("The request urlPath is {}, retryTimes is {}", urlPath, retryTimes);
HttpMethod method = HttpMethod.valueOf(exchange.getRequest().getMethodValue());
WebClient.RequestBodySpec requestBodySpec = webClient.method(method).uri(urlPath);
return handleRequestBody(requestBodySpec, exchange, timeout, retryTimes, chain);
}
private Mono<Void> handleRequestBody(final WebClient.RequestBodySpec requestBodySpec,
final ServerWebExchange exchange,
final long timeout,
final int retryTimes,
final SoulPluginChain chain) {
// The following code looks too much like a front-end Ajax call, guessing it is a request send
returnrequestBodySpec.headers(httpHeaders -> { httpHeaders.addAll(exchange.getRequest().getHeaders()); httpHeaders.remove(HttpHeaders.HOST); }) .contentType(buildMediaType(exchange)) .body(BodyInserters.fromDataBuffers(exchange.getRequest().getBody())) .exchange() .doOnError(e -> log.error(e.getMessage())) .timeout(Duration.ofMillis(timeout)) .retryWhen(Retry.onlyIf(x -> x.exception()instanceof ConnectTimeoutException)
.retryMax(retryTimes)
.backoff(Backoff.exponential(Duration.ofMillis(200), Duration.ofSeconds(20), 2.true)))
.flatMap(e -> doNext(e, exchange, chain));
}
private Mono<Void> doNext(final ClientResponse res, final ServerWebExchange exchange, final SoulPluginChain chain) {
if (res.statusCode().is2xxSuccessful()) {
exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.SUCCESS.getName());
} else {
exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.ERROR.getName());
}
exchange.getAttributes().put(Constants.CLIENT_RESPONSE_ATTR, res);
return chain.execute(exchange);
}
Copy the code
Continue to the: WebClientResponsePlugin and find code that returns a suspected response to the client
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
return chain.execute(exchange).then(Mono.defer(() -> {
ServerHttpResponse response = exchange.getResponse();
ClientResponse clientResponse = exchange.getAttribute(Constants.CLIENT_RESPONSE_ATTR);
if (Objects.isNull(clientResponse)
|| response.getStatusCode() == HttpStatus.BAD_GATEWAY
|| response.getStatusCode() == HttpStatus.INTERNAL_SERVER_ERROR) {
Object error = SoulResultWrap.error(SoulResultEnum.SERVICE_RESULT_ERROR.getCode(), SoulResultEnum.SERVICE_RESULT_ERROR.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
}
if (response.getStatusCode() == HttpStatus.GATEWAY_TIMEOUT) {
Object error = SoulResultWrap.error(SoulResultEnum.SERVICE_TIMEOUT.getCode(), SoulResultEnum.SERVICE_TIMEOUT.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
}
response.setStatusCode(clientResponse.statusCode());
response.getCookies().putAll(clientResponse.cookies());
response.getHeaders().putAll(clientResponse.headers().asHttpHeaders());
// Suspected response returned
return response.writeWith(clientResponse.body(BodyExtractors.toDataBuffers()));
}));
}
Copy the code
After a debugging session, DividePlugin, WebClientPlugin and WebClientResponsePlugin are very suspicious. We cancel all breakpoints and debug them on breakpoints
First testing DividePlugin, we send a request to http://localhost:9195/get, no configuration after sending the debug all the way down, found that returns the results as follows:
{
"code": - 107.."message": "Can not find selector, please check your configuration!"."data": null
}
Copy the code
As expected, we have a look at the relevant code, and later also tested the incorrect configuration, found that the following call function:
public static Mono<Void> result(final ServerWebExchange exchange, final Object result) {
exchange.getResponse().getHeaders().setContentType(MediaType.APPLICATION_JSON);
// This is exchange.getresponse ().writeWith
return exchange.getResponse().writeWith(Mono.just(exchange.getResponse()
.bufferFactory().wrap(Objects.requireNonNull(JsonUtils.toJson(result)).getBytes())));
}
Copy the code
Continue to test the right request, WebClientPlugin through the above analysis and judgment according to the function code roughly, handleRequestBody sends the request, the doNext can receive the request as a result, we are on the doNext breakpoints, It was found that the server received the request in the background (write their own Netty service, and print the log), verify that we here is the request to send the guess
Execute (exchange); execute(exchange); This is a bit like a lamda expression (or a vue request), and then is executed after all the plugins have run, and then the client gets the result
The specific sending logic is not understood, but it does not affect the processing flow analysis this time. There is also a place where the debug is constantly called. Let’s go back and look at it and see if there are any analysis omissions
In class DefaultWebFilterChain there is a bit of a loop called like this: DefaultWebFilterChain
public Mono<Void> filter(ServerWebExchange exchange) {
return Mono.defer(() -> {
return this.currentFilter ! =null && this.chain ! =null ? this.invokeFilter(this.currentFilter, this.chain, exchange) : this.handler.handle(exchange);
});
}
Copy the code
We check its related classes have the following several, enter the corresponding class to have a look, roughly as follows:
- MetricsWebFilter: I don’t understand
- HealthFilter: feels like a monitoring check and can be done directly because it is local; /actuator/health”, “/health_check”, tries to use the same processing logic and returns directly
- FileSizeFilter: File upload size limit? MediaType.MULTIPART_FORM_DATA.isCompatibleWith(mediaType)
- WebSocketParamFilter: not quite sure what it does
- HiddenHttpMethodFilter: There seems to be no logical code
Health check is returned directly in HealthFilter, file upload size limit function is also in Filter, other unclear, but does not affect the overall situation
conclusion
After summary and combing, the following preliminary processing process overview is obtained:
- HttpServerOperations: The obvious place where Netty’s requests are received, the request entry
- ReactorHttpHandlerAdapter: to generate the response and request
- HttpWebHandlerAdapter: Exchange generation
- FilteringWebHandler: Indicates the filter operation
- SoulWebHandler: Plugins call chain
Once the request is received by Netty, it goes to Filter, where it does some processing: health check, file size check, etc., and then to the core plugins, where there are three parts of the plugin that need to be noted (self-assessment) :
- Pre-processing: The plugin here will match, feeling is for the configured URL authentication, blacklist, traffic limiting, degrade, fuse and other operations
- Request sending: HTTP, WebSocket, RPC, and file upload (this is a guess) requests are processed and sent to the back-end server
- Response return: There are only two types of response, HTTP and RPC, which are returned to the client
As you can see, plugins are very much at the core, where the key functions are implemented, and the Divide Plugin seems to play the role of route matching. In Soul, there is no obvious single route matching
Request and response processing is also handled in plugins