preface
In our first ElasticAPM tutorial, we learned what watchability is and how powerful ElasticAPM can be, but it’s not enough to just use the single-player mode as described in the previous tutorial. Remember the two questions at the end of the last post:
1. This paper passed the test in the stand-alone environment, but in the distributed environment, many applications will be connected by requests. Can the service tracking be realized? What is the implementation principle?
Elastic APM automatically collects HTTP requests. Can Elastic APM work in a PRC distributed environment? Does it have to be implemented using a public API?
Does ElasticAPM work well under distributed conditions? Does ElasticAPM work in RPC environments?
ElasticAPM supports distributed Http calls by default, but does not support RPC. However, many companies use RPC protocol as the communication protocol of their internal systems. For example, our company uses Spring Cloud Alibaba as the framework of search service, and the communication of applications within the framework is realized by using RPC framework Dubbo. So the question becomes how to integrate ElasticAPM into Spring Cloud Alibaba.
Architecture explanation & problem analysis
First, LET me outline the architecture and workflow of Spring Cloud Alibaba and ElasticAPM.
AS shown in the architecture diagram, the search system is divided into Gateway application, US application, AS application and BS application. The user’s request will reach the Gateway first, and the Gateway will forward the request to US application through Http protocol. US application will invoke AS application through Dubbo protocol, and AS application will invoke BS application through Dubbo protocol.
Request —http—>US—RPC—->AS—–RPC——>BS
Every application starts with apM-Agent already integrated (see ElasticAPM for the first time if you don’t know how), it would be perfect if APM-Agent supported Dubbo by default (but it doesn’t). Therefore, the whole link tracking, after arriving at US, did not report the anchor point data of subsequent application. ElasticAPM Public API: ElasticAPM Public API: ElasticAPM Public API: ElasticAPM
The public API of the Elastic APM Java agent lets you customize and manually create spans and transactions, as well as track errors.
Yes, you can customize Span and Transaction. If you don’t know what Span and Transaction are, please see ElasticAPM for the first time or read the official documentation. Since the Agent does not support Dubbo by default, we use the Public API to implement the functionality.
Design idea
Based on the architecture of Spring Cloud Alibaba, we can achieve it as shown in the following figure.
- First of all, the user’s request must pass through the microservice gateway. In the filter of the gateway, the parent level is buried first
Transaction
. - After the request passes through the gateway, it is forwarded to the layer 1 application by the gateway
http
Requests, if implemented with SpringMVC, need to be inController
Report,The child Transaction
. - After the request is processed by the first layer application, the lower layer applications are all
Dubbo
The agreement. You can use Dubbo’s filter mechanism. YeahConcumer
andProvider
Both are intercepted in a way that does not intrude into business code. - Eventually, the request is returned to the microservice gateway and invoked
transaction.end()
Report the rootTransaction
. - All procedures are complete.
Core implementation explanation
Microservice Gateway
Microservices gateways need to do several things:
- Open the root
Transaction
POST
Add trace ID to request body,GET
requestParameter
Add trace ID to- Called after the request is returned
transaction.end()
To complete the report
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
HttpMethod httpMethod = exchange.getRequest().getMethod();
// Start a Transaction
Transaction transaction = ElasticApm.startTransaction();
transaction.setName("mainSearch");
transaction.setType(Transaction.TYPE_REQUEST);
// Create a Span
Span span = transaction.startSpan("gateway"."filter"."gateway action");
span.setName("com.mfw.search.gateway.filter.PostBodyCacheFilter#filter");
LOGGER.info(TransactionId :{}", transaction.getId());
// Check whether the Http request is POST or GET
if (HttpMethod.POST.equals(httpMethod)) {
ServerRequest serverRequest = ServerRequest.create(exchange, messageReaders);
MediaType mediaType = exchange.getRequest().getHeaders().getContentType();
// Step 4 define the processing logic for the Http body
Mono<String> modifiedBody = serverRequest.bodyToMono(String.class).flatMap(body -> { // Determine the body type
if (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(mediaType)) {
/ / important! Get the body data and pass it to the callback function for business logic processing
Map<String, String> bodyMap = decodeBody(body);
// Set up the latest bodyMap to enter exchange
exchange.getAttributes().put(GatewayConstant.CACHE_POST_BODY, bodyMap);
/ / the point! Dynamically add the body transaction tag for use by the downstream application Controller
span.injectTraceHeaders((name, value) -> {
bodyMap.put(name, value);
LOGGER.info("APM key:{}, transactionId:{}", name, value);
});
// Don't forget sp.end () or you'll lose the report
span.end();
return Mono.just(encodeBody(bodyMap));
} else if (MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) {
// origin body map
Map<String, String> bodyMap = decodeJsonBody(body);
exchange.getAttributes().put(GatewayConstant.CACHE_POST_BODY, bodyMap);
/ / the point! Dynamically add the body transaction tag for use by the downstream application Controller
span.injectTraceHeaders((name, value) -> {
bodyMap.put(name, value);
LOGGER.info("APM key:{}, transactionId:{}", name, value);
});
span.end();
return Mono.just(encodeJsonBody(bodyMap));
}
return Mono.empty();
});
BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody, String.class);
HttpHeaders headers = new HttpHeaders();
headers.putAll(exchange.getRequest().getHeaders());
// the new content type will be computed by bodyInserter
// and then set in the request decorator
headers.remove(HttpHeaders.CONTENT_LENGTH);
CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers);
return bodyInserter.insert(outputMessage, new BodyInserterContext()).then(Mono.defer(() -> {
ServerHttpRequestDecorator decorator = new ServerHttpRequestDecorator(exchange.getRequest()) {
public HttpHeaders getHeaders(a) {
long contentLength = headers.getContentLength();
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.putAll(super.getHeaders());
if (contentLength > 0) {
httpHeaders.setContentLength(contentLength);
} else {
httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked");
}
return httpHeaders;
}
public Flux<DataBuffer> getBody(a) {
returnoutputMessage.getBody(); }};// Step 5 After the request returns, report the transaction
return chain.filter(exchange.mutate().request(decorator).build()).then(Mono.fromRunnable(() -> transaction.end()));
}));
} else if (HttpMethod.GET.equals(httpMethod)) {
span.injectTraceHeaders((name, value) -> {
exchange.getRequest().getQueryParams().set(name, transaction.getId());
LOGGER.info("APM key:{}, transactionId:{}", name, value);
});
return chain.filter(exchange).then(Mono.fromRunnable(() -> {
span.end();
transaction.end();
LOGGER.info("APM buy complete,transactionId:{}", transaction.getId());
}));
} else {
//not support other Http Method
exchange.getResponse().setStatusCode(HttpStatus.UNSUPPORTED_MEDIA_TYPE);
returnexchange.getResponse().setComplete(); }}Copy the code
Controller
The realization of the Controller layer adopted SpringAOP way, the benefits are for business code does not invade, high scalability, monitoring method to direct configuration @ TransactionWithRemoteParent ().
The following code is through @ TransactionWithRemoteParent () method of the Controller of the report.
@PostMapping(value = "/search", consumes = "application/json", produces = "application/json")
@TransactionWithRemoteParent(a)public String searchForm(@RequestBody String req) {
String result = asService.helloAs(req);
return result;
}
Copy the code
AOP implementations
@Aspect
public class ApmAspect {
private static final Logger LOGGER = LoggerFactory.getLogger(ApmAspect.class);
@PostConstruct
private void init(a) {
LOGGER.info("ApmAspect loaded");
}
@Pointcut(value = "@annotation(transactionWithRemoteParent)", argNames = "transactionWithRemoteParent")
public void pointcut(TransactionWithRemoteParent transactionWithRemoteParent) {}@Around(value = "pointcut(transactionWithRemoteParent)", argNames = "joinPoint,transactionWithRemoteParent")
public Object around(ProceedingJoinPoint joinPoint, TransactionWithRemoteParent transactionWithRemoteParent) throws Throwable {
Transaction transaction = null;
try {
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
transaction = ElasticApm.startTransactionWithRemoteParent(key -> {
String httpRequest = (String) joinPoint.getArgs()[0];
JSONObject json = JSON.parseObject(httpRequest);
String traceId = json.getString(key);
LOGGER.info("Section adds sub-transaction, key={},value={}", key, traceId);
RpcContext.getContext().setAttachment(key, traceId);
return traceId;
});
transaction.setName(StringUtils.isNotBlank(transactionWithRemoteParent.name())
? transactionWithRemoteParent.name() : signature.getName());
transaction.setType(Transaction.TYPE_REQUEST);
return joinPoint.proceed();
} catch (Throwable throwable) {
if(transaction ! =null) {
transaction.captureException(throwable);
}
throw throwable;
} finally {
if(transaction ! =null) {
LOGGER.info("Transaction:{}", transaction.getId()); transaction.end(); }}}}Copy the code
Dubbo filter
The following code is the DubboConsumer filter, specifically for handling APM. The implementation of DubboProvider is similar.
@Activate(group = "consumer")
public class DubboConsumerApmFilter implements Filter {
private static final Logger LOGGER = LoggerFactory.getLogger(DubboConsumerApmFilter.class);
@Override
public Result invoke(Invoker
invoker, Invocation invocation) throws RpcException {
Transaction transaction = ElasticApm.startTransactionWithRemoteParent(key -> {
String traceId = invocation.getAttachment(key);
LOGGER.info("key={},value={}", key, traceId);
return traceId;
});
try (final Scope scope = transaction.activate()) {
String name = "consumer:" + invocation.getInvoker().getInterface().getName() + "#" + invocation.getMethodName();
transaction.setName(name);
transaction.setType(Transaction.TYPE_REQUEST);
Result result = invoker.invoke(invocation);
return result;
} catch (Exception e) {
transaction.captureException(e);
throw e;
} finally{ transaction.end(); }}}@Activate(group = "provider")
public class DubboProviderApmFilter implements Filter {
@Override
public Result invoke(Invoker
invoker, Invocation invocation) throws RpcException {
// use startTransactionWithRemoteParent to create transaction with parent, which id from prc context
Transaction transaction = ElasticApm.startTransactionWithRemoteParent(key -> invocation.getAttachment(key));
try (final Scope scope = transaction.activate()) {
String name = "provider:" + invocation.getInvoker().getInterface().getName() + "#" + invocation.getMethodName();
transaction.setName(name);
transaction.setType(Transaction.TYPE_REQUEST);
return invoker.invoke(invocation);
} catch (Exception e) {
transaction.captureException(e);
throw e;
} finally{ transaction.end(); }}}Copy the code
The effect
The source code
Github.com/siyuanWang/…
Reference documentation
ElasticAPM integration with Dubbo