This is the 30th day of my participation in the November Gwen Challenge. Check out the event details: The last Gwen Challenge 2021

This series code address: github.com/JoJoTec/spr…

In this section we continue our design to avoid link loss, focusing on how to ensure that each GlobalFilter retains link information once the existing Span is acquired. First of all, we define the core Publisher of Reactor, which is the factory of Mono and Flux, and encapsulate the link information to ensure the Mono and Flux generated by this factory. As long as Mono and Flux generated by this factory are joined together no matter how they are, the link information will remain:

Custom factories for Mono and Flux

Public Subscriber encapsulation: all key interfaces of reactor Subscriber are checked to see if the current context has link information, that is, Span. If there is no link information, it will be wrapped. If there is link information, it will be executed directly.

public class TracedCoreSubscriber<T> implements Subscriber<T>{ private final Subscriber<T> delegate; private final Tracer tracer; private final CurrentTraceContext currentTraceContext; private final Span span; TracedCoreSubscriber(Subscriber<T> delegate, Tracer tracer, CurrentTraceContext currentTraceContext, Span span) { this.delegate = delegate; this.tracer = tracer; this.currentTraceContext = currentTraceContext; this.span = span; } @Override public void onSubscribe(Subscription s) { executeWithinScope(() -> { delegate.onSubscribe(s); }); } @Override public void onError(Throwable t) { executeWithinScope(() -> { delegate.onError(t); }); } @Override public void onComplete() { executeWithinScope(() -> { delegate.onComplete(); }); } @Override public void onNext(T o) { executeWithinScope(() -> { delegate.onNext(o); }); } private void executeWithinScope(Runnable Runnable) {// If there is no link information, Mandatory package if (tracer currentSpan () = = null) {try (CurrentTraceContext. Scope Scope = this.currentTraceContext.maybeScope(this.span.context())) { runnable.run(); }} else {// If there is link information, run able.run(); }}}Copy the code

Then, the agent TracedFlux of all Flux and the agent TracedMono of all Mono are defined respectively. In fact, when subscribes, the incoming CoreSubscriber is packaged with TracedCoreSubscriber:

public class TracedFlux<T> extends Flux<T> { private final Flux<T> delegate; private final Tracer tracer; private final CurrentTraceContext currentTraceContext; private final Span span; TracedFlux(Flux<T> delegate, Tracer tracer, CurrentTraceContext currentTraceContext, Span span) { this.delegate = delegate; this.tracer = tracer; this.currentTraceContext = currentTraceContext; this.span = span; } @Override public void subscribe(CoreSubscriber<? super T> actual) { delegate.subscribe(new TracedCoreSubscriber(actual, tracer, currentTraceContext, span)); } } public class TracedMono<T> extends Mono<T> { private final Mono<T> delegate; private final Tracer tracer; private final CurrentTraceContext currentTraceContext; private final Span span; TracedMono(Mono<T> delegate, Tracer tracer, CurrentTraceContext currentTraceContext, Span span) { this.delegate = delegate; this.tracer = tracer; this.currentTraceContext = currentTraceContext; this.span = span; } @Override public void subscribe(CoreSubscriber<? super T> actual) { delegate.subscribe(new TracedCoreSubscriber(actual, tracer, currentTraceContext, span)); }}Copy the code

Define the factory class to create TracedFlux using the request ServerWebExchange and the original Flux, and to create TracedMono using the request ServerWebExchange and the original Mono, And Span is obtained by Attributes. According to the source code analysis above, we know that this Attribute is added to Attributes through TraceWebFilter. Since we only use it in GatewayFilter, this Attribute must exist after TraceWebFilter.

@Component public class TracedPublisherFactory { protected static final String TRACE_REQUEST_ATTR = Span.class.getName(); @Autowired private Tracer tracer; @Autowired private CurrentTraceContext currentTraceContext; public <T> Flux<T> getTracedFlux(Flux<T> publisher, ServerWebExchange exchange) { return new TracedFlux<>(publisher, tracer, currentTraceContext, (Span) exchange.getAttributes().get(TRACE_REQUEST_ATTR)); } public <T> Mono<T> getTracedMono(Mono<T> publisher, ServerWebExchange exchange) { return new TracedMono<>(publisher, tracer, currentTraceContext, (Span) exchange.getAttributes().get(TRACE_REQUEST_ATTR)); }}Copy the code

The public abstraction GlobalFilter-commonTracefilter

We write all the GlobalFilter abstract classes we will implement later. The main functions of this abstract class are:

  • Ensure that the link information is present in the GlobalFilter itself and in the concatenated link, and ensure that the Mono returned by the filter is generated by the Factory we implemented above.
  • Different Globalfilters need to be Ordered and executed sequentially by implementing the Ordered interface
package com.github.jojotech.spring.cloud.apigateway.filter; import com.github.jojotech.spring.cloud.apigateway.common.TraceWebFilterUtil; import com.github.jojotech.spring.cloud.apigateway.common.TracedPublisherFactory; import reactor.core.publisher.Mono; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.gateway.filter.GatewayFilterChain; import org.springframework.cloud.gateway.filter.GlobalFilter; import org.springframework.cloud.sleuth.CurrentTraceContext; import org.springframework.cloud.sleuth.Span; import org.springframework.cloud.sleuth.Tracer; import org.springframework.core.Ordered; import org.springframework.web.server.ServerWebExchange; /** * Subclasses of all filters * mainly ensure the integrity of the span, in some cases, the SPAN will stop in mid-stream, resulting in no traceId and spanId references in the log: https://github.com/spring-cloud/spring-cloud-sleuth/issues/2004 */ public abstract class AbstractTracedFilter implements  GlobalFilter, Ordered { @Autowired protected Tracer tracer; @Autowired protected TracedPublisherFactory tracedPublisherFactory; @Autowired protected CurrentTraceContext currentTraceContext; @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { Mono<Void> traced; if (tracer.currentSpan() == null) { try (CurrentTraceContext.Scope scope = this.currentTraceContext .maybeScope(((Span) exchange.getAttributes().get(TraceWebFilterUtil.TRACE_REQUEST_ATTR)) .context())) { traced = traced(exchange, chain); }} else {// If some link information exists, execute this operation directly. } return tracedPublisherFactory.getTracedMono(traced, exchange); } protected abstract Mono<Void> traced(ServerWebExchange exchange, GatewayFilterChain chain); }Copy the code

This allows us to implement a customized GlobalFilter based on this abstract class

Wechat search “my programming meow” public account, a daily brush, easy to improve skills, won a variety of offers: