This is the 28th day of my participation in the November Gwen Challenge. Check out the event details: The last Gwen Challenge 2021
This series code address: github.com/JoJoTec/spr…
One more issue I want to share with you before we start writing our own log Filter is the possibility of link information loss in the Spring Cloud Gateway.
Major conflicts – Design conflicts between the Project Reactor and Java Logger MDC
Poject Reactor is an implementation of programming pattern based on asynchronous responsive design. The main idea of Poject Reactor is to write execution link first, and then sub executes the whole link. But each part of the link, exactly which thread executes, is uncertain.
Java’s logging framework design, with ITS MDC (Mapped Diagnostic Context) messages, is thread-based and can simply be understood as a ThreadLocal Map. Log link information is stored in the MDC.
This shows that the Project Reactor and the MDC of the logging framework are incompatible by default, and the MDC changes whenever an asynchronous thread switch occurs. Spring Cloud Sleuth adds a lot of glue codes for this purpose, but the wise man must be wise, and the Project Reactor application scenarios and library are constantly developing and growing, Spring Cloud Sleuth may miss some scenarios and lead to the loss of link information.
A common scenario of link information loss in Spring Cloud Gateway
Let’s write a simple test project (project address) :
Introducing dependencies:
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> < version > 2.4.6 < / version > < / parent > < dependencies > < the dependency > < groupId > org. Springframework. Cloud < / groupId > <artifactId>spring-cloud-starter-gateway</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-sleuth</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-log4j2</artifactId> </dependency> <! --log4j2 dependency required for asynchronous logging, Lmax </groupId> <artifactId> Disruptor </artifactId> Disruptor </artifactId> <version>${disruptor.version}</version> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>2020.0.3</version> <type> POm </type> <scope>import</scope> </dependency> </dependencies> </dependencies>Copy the code
For all paths open AdaptCachedBodyGlobalFilter:
@Configuration(proxyBeanMethods = false) public class ApiGatewayConfiguration { @Autowired private AdaptCachedBodyGlobalFilter adaptCachedBodyGlobalFilter; @Autowired private GatewayProperties gatewayProperties; @ PostConstruct public void init () {gatewayProperties. GetRoutes (). The forEach (routeDefinition - > {/ / for spring cloud gateway Routing configuration of each routing enable AdaptCachedBodyGlobalFilter EnableBodyCachingEvent EnableBodyCachingEvent = new EnableBodyCachingEvent (new Object(), routeDefinition.getId()); adaptCachedBodyGlobalFilter.onApplicationEvent(enableBodyCachingEvent); }); }}Copy the code
Configure (we only have one route to forward requests to httpbin.org, the HTTP request testing site) :
server:
port: 8181
spring:
application:
name: apiGateway
cloud:
gateway:
httpclient:
connect-timeout: 500
response-timeout: 60000
routes:
- id: first_route
uri: http://httpbin.org
predicates:
- Path=/httpbin/**
filters:
- StripPrefix=1
Copy the code
Add the two global Filter, before a AdaptCachedBodyGlobalFilter, a after AdaptCachedBodyGlobalFilter. These two filters are very simple, just a single line of logging.
@Log4j2 @Component public class PreLogFilter implements GlobalFilter, Ordered { public static final int ORDER = new AdaptCachedBodyGlobalFilter().getOrder() - 1; @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { log.info("before AdaptCachedBodyGlobalFilter"); return chain.filter(exchange); } @Override public int getOrder() { return ORDER; } } @Log4j2 @Component public class PostLogFilter implements GlobalFilter, Ordered { public static final int ORDER = new AdaptCachedBodyGlobalFilter().getOrder() + 1; @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { log.info("after AdaptCachedBodyGlobalFilter"); return chain.filter(exchange); } @Override public int getOrder() { return ORDER; }}Copy the code
Finally, specify that the output format of Log4j2 contains link information, as specified at the beginning of the series.
After launch this application, visit http://127.0.0.1:8181/httpbin/anything and view the log, found PostLogFilter the logs, no link information:
The 2021-09-08 06:32:35. 457 INFO [service - apiGateway, 51063 d6f1fe264d0, 51063 d6f1fe264d0] [30600] [reactor - HTTP - nio - 2] [?] : Before AdaptCachedBodyGlobalFilter 06:32:35 2021-09-08. 474 INFO [service - apiGateway,,] the [30600] [reactor - HTTP - nio - 2] [?] : after AdaptCachedBodyGlobalFilterCopy the code
How does Spring Cloud Sleuth add link information
From previous source code analysis in this series, we know that in the initial TraceWebFilter, we packaged Mono as a MonoWebFilterTrace, whose core source code is:
@Override public void subscribe(CoreSubscriber<? super Void> subscriber) { Context context = contextWithoutInitialSpan(subscriber.currentContext()); Span span = findOrCreateSpan(context); Slf4j. MDC // The MDC of the log is usually a Map of ThreadLocal. For the realization of the Log4j2 class is org. Apache. Logging. Log4j. ThreadContext, ContextMap (contextMap), contextMap (contextMap), contextMap (contextMap), contextMap (contextMap), contextMap Each thread to access their own Map link information try (CurrentTraceContext. Scope Scope = this. CurrentTraceContext. MaybeScope (span) context ())) { // Wrap the actual subscribe in the Span Context, Subscribe (new WebFilterTraceSubscriber(subscriber, context, Span, this)); } // After scope.close(), } @override public Object scanUnsafe(Attr key) {if (key == Attr.RUN_STYLE) { Return attr.runstyle.sync; return attr.runstyle.sync; return attr.runstyle.sync; } return super.scanUnsafe(key); }Copy the code
What does WebFilterTraceSubscriber do? When an exception occurs, when an HTTP request ends, we might want to record the response, the exception, into a Span, and that’s what this class encapsulates.
After being wrapped by MonoWebFilterTrace, since spring-WebFlux processes the request, which is actually a request to subscribe after wrapping the Mono we obtained above, The entire inner Mono publish link and subscribe link are wrapped by the scope in WebFilterTraceSubscriber. Link information will not be lost as long as the GatewayFilter does not switch to some forcibly asynchronous Mono or Flux causing thread switching.
Why is link information lost in the above test items
Let’s take a look at after AdaptCachedBodyGlobalFilter, what will become of our previous spell of Mono link:
return Mono.defer(() -> new MonoWebFilterTrace(source, RoutePredicateHandlerMapping. This. LookupRoute (exchange) / / according to the request for routing. FlatMap ((Function < the Route, Mono<?>>) r -> { exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r); / / routing in the Attributes, we will use the return back Mono. Just (RoutePredicateHandlerMapping. This. WebHandler); / / return RoutePredicateHandlerMapping FilteringWebHandler}). SwitchIfEmpty (. / / if for Mono empty (), Empty (). Then (mono.fromrunnable (() -> {// return mono.empty (), If (logger.istraceEnabled ()) {logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]"); }}))). SwitchIfEmpty (DispatcherHandler. Enclosing createNotFoundError ()) / / if there is no return to Mono. Empty handlerMapping (), Directly back to 404. Then (Mono) defer (() - > {/ / omitted in AdaptCachedBodyGlobalFilter link nested / / read in front of the Body, because TCP unpacking, Databufferutils.join (exchange.getrequest ().getBody())) // If there is no Body, Directly returns an empty DataBuffer. DefaultIfEmpty (factory. Wrap (new EmptyByteBuf (factory) getByteBufAllocator ()))) / / decorate method DataBuffer in exchange, a list of Attributes, only to prevent the repeat entering this ` AdaptCachedBodyGlobalFilter ` situation lead to repeat the cache request Body / /, Wrap the new request with the new body and the original request, GatewayFilters link. Map (dataBuffer -> decorate(exchange, dataBuffer, cacheDecoratedRequest)) .switchIfEmpty(Mono.just(exchange.getRequest())).flatMap(function); }). Then (Mono. Empty ()))), / / call the corresponding Handler TraceWebFilter. Enclosing isTracePresent (), TraceWebFilter. This, TraceWebFilter. This. SpanFromContextRetriever ()). TransformDeferred (- > (call) {/ / MetricsWebFilter related processing, in front of the code is given, });) ;Copy the code
Databufferutils.join (exchange.getrequest ().getBody())); Submit a task that attempts to read the Body of the request, adding the subsequent link processing of the GatewayFilter to the callback after reading the Body, and return immediately after submitting the task. It’s a little more complicated to look at this way, but let’s use a similar analogy:
Span Span = tracer.newtrace (); // Declare a MonoOperator class MonoWebFilterTrace<T> extends MonoOperator<T, similar to the MonoWebFilterTrace encapsulated in TraceWebFilter. T> { protected MonoWebFilterTrace(Mono<? extends T> source) { super(source); } @Override public void subscribe(CoreSubscriber<? SpanInScope SpanInScope = tracer.withspaninscope (span)) { source.subscribe(actual); // When spanInScope is about to be turned off (that is, link information is removed from ThreadLocal's Map), log log.info("stopped") is printed; } } } Mono.defer(() -> new MonoWebFilterTrace( Mono.fromRunnable(() -> { log.info("first"); }) // Simulate fluxreceive.then (mono.delay (duration.ofseconds (1)).doonsuccess (longSignal -> log.info(longSignal))))) ).subscribe(aLong -> log.info(aLong));Copy the code
Mono. Delay and FluxReceive behave similarly in that they switch thread pools asynchronously. Execute the above code, we can see from the log:
The 2021-09-08 07:12:45. 236 INFO [service - apiGateway, 7 b2f5c190e1406cb, 7 b2f5c190e1406cb] [31868] [reactor - HTTP - nio - 2] [?] : First the 2021-09-08 07:12:45. 240 INFO [service - apiGateway, 7 b2f5c190e1406cb, 7 b2f5c190e1406cb] [31868] [REACTOR - HTTP-NIO-2][?:]: Stopped 2021-09-08 07:12:46.241 INFO [service-apigateway,,] [31868] [PARALLEL -1][?:]: DoOnEach_onNext (0) 2021-09-08 07:12:46.242 INFO [service-apigateway,,] [31868] [parallel-1][?:]: OnComplete () 2021-09-08 07:12:46.242 INFO [service-apigateway,,] [31868] [PARALLEL 1][?:]: 0Copy the code
In the Spring Cloud Gateway, FluxReceive for The Request Body uses the same thread pool as the one that called GatewayFilter, so it may still be the same thread, but since Span has ended, Link information has been removed from ThreadLocal’s Map, so there is no link information in the log.
Wechat search “my programming meow” public account, a daily brush, easy to improve skills, won a variety of offers: