preface

In our first ElasticAPM tutorial, we learned what watchability is and how powerful ElasticAPM can be, but it’s not enough to just use the single-player mode as described in the previous tutorial. Remember the two questions at the end of the last post:

1. This paper passed the test in the stand-alone environment, but in the distributed environment, many applications will be connected by requests. Can the service tracking be realized? What is the implementation principle?

Elastic APM automatically collects HTTP requests. Can Elastic APM work in a PRC distributed environment? Does it have to be implemented using a public API?

Does ElasticAPM work well under distributed conditions? Does ElasticAPM work in RPC environments?

ElasticAPM supports distributed Http calls by default, but does not support RPC. However, many companies use RPC protocol as the communication protocol of their internal systems. For example, our company uses Spring Cloud Alibaba as the framework of search service, and the communication of applications within the framework is realized by using RPC framework Dubbo. So the question becomes how to integrate ElasticAPM into Spring Cloud Alibaba.

Architecture explanation & problem analysis

First, LET me outline the architecture and workflow of Spring Cloud Alibaba and ElasticAPM.

AS shown in the architecture diagram, the search system is divided into Gateway application, US application, AS application and BS application. The user’s request will reach the Gateway first, and the Gateway will forward the request to US application through Http protocol. US application will invoke AS application through Dubbo protocol, and AS application will invoke BS application through Dubbo protocol.

Requesthttp—>USRPC—->AS—–RPC——>BS

Every application starts with apM-Agent already integrated (see ElasticAPM for the first time if you don’t know how), it would be perfect if APM-Agent supported Dubbo by default (but it doesn’t). Therefore, the whole link tracking, after arriving at US, did not report the anchor point data of subsequent application. ElasticAPM Public API: ElasticAPM Public API: ElasticAPM Public API: ElasticAPM

The public API of the Elastic APM Java agent lets you customize and manually create spans and transactions, as well as track errors.

Yes, you can customize Span and Transaction. If you don’t know what Span and Transaction are, please see ElasticAPM for the first time or read the official documentation. Since the Agent does not support Dubbo by default, we use the Public API to implement the functionality.

Design idea

Based on the architecture of Spring Cloud Alibaba, we can achieve it as shown in the following figure.

  • First of all, the user’s request must pass through the microservice gateway. In the filter of the gateway, the parent level is buried firstTransaction.
  • After the request passes through the gateway, it is forwarded to the layer 1 application by the gatewayhttpRequests, if implemented with SpringMVC, need to be inControllerReport,The child Transaction.
  • After the request is processed by the first layer application, the lower layer applications are allDubboThe agreement. You can use Dubbo’s filter mechanism. YeahConcumerandProviderBoth are intercepted in a way that does not intrude into business code.
  • Eventually, the request is returned to the microservice gateway and invokedtransaction.end()Report the rootTransaction.
  • All procedures are complete.

Core implementation explanation

Microservice Gateway

Microservices gateways need to do several things:

  • Open the rootTransaction
  • POSTAdd trace ID to request body,GETrequestParameterAdd trace ID to
  • Called after the request is returnedtransaction.end()To complete the report
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        HttpMethod httpMethod = exchange.getRequest().getMethod();
				// Start a Transaction
        Transaction transaction = ElasticApm.startTransaction();
        transaction.setName("mainSearch");
        transaction.setType(Transaction.TYPE_REQUEST);
				// Create a Span
        Span span = transaction.startSpan("gateway"."filter"."gateway action");
        span.setName("com.mfw.search.gateway.filter.PostBodyCacheFilter#filter");

        LOGGER.info(TransactionId :{}", transaction.getId());
        // Check whether the Http request is POST or GET
        if (HttpMethod.POST.equals(httpMethod)) {
            ServerRequest serverRequest = ServerRequest.create(exchange, messageReaders);
            MediaType mediaType = exchange.getRequest().getHeaders().getContentType();
            // Step 4 define the processing logic for the Http body
            Mono<String> modifiedBody = serverRequest.bodyToMono(String.class).flatMap(body -> {            // Determine the body type
                if (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(mediaType)) {

                    / / important! Get the body data and pass it to the callback function for business logic processing
                    Map<String, String> bodyMap = decodeBody(body);
                    // Set up the latest bodyMap to enter exchange
                    exchange.getAttributes().put(GatewayConstant.CACHE_POST_BODY, bodyMap);
                    / / the point! Dynamically add the body transaction tag for use by the downstream application Controller
                    span.injectTraceHeaders((name, value) -> {
                        bodyMap.put(name, value);
                        LOGGER.info("APM key:{}, transactionId:{}", name, value);
                    });
                    // Don't forget sp.end () or you'll lose the report
                    span.end();
                    return Mono.just(encodeBody(bodyMap));
                } else if (MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) {
                    // origin body map
                    Map<String, String> bodyMap = decodeJsonBody(body);
                    exchange.getAttributes().put(GatewayConstant.CACHE_POST_BODY, bodyMap);
                    / / the point! Dynamically add the body transaction tag for use by the downstream application Controller
                    span.injectTraceHeaders((name, value) -> {
                        bodyMap.put(name, value);
                        LOGGER.info("APM key:{}, transactionId:{}", name, value);
                    });
                    span.end();
                    return Mono.just(encodeJsonBody(bodyMap));
                }
                return Mono.empty();
            });

            BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody, String.class);
            HttpHeaders headers = new HttpHeaders();
            headers.putAll(exchange.getRequest().getHeaders());

            // the new content type will be computed by bodyInserter
            // and then set in the request decorator
            headers.remove(HttpHeaders.CONTENT_LENGTH);

            CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers);
            return bodyInserter.insert(outputMessage, new BodyInserterContext()).then(Mono.defer(() -> {
                ServerHttpRequestDecorator decorator = new ServerHttpRequestDecorator(exchange.getRequest()) {

                    public HttpHeaders getHeaders(a) {
                        long contentLength = headers.getContentLength();
                        HttpHeaders httpHeaders = new HttpHeaders();
                        httpHeaders.putAll(super.getHeaders());
                        if (contentLength > 0) {
                            httpHeaders.setContentLength(contentLength);
                        } else {
                            httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked");
                        }
                        return httpHeaders;
                    }

                    public Flux<DataBuffer> getBody(a) {
                        returnoutputMessage.getBody(); }};// Step 5 After the request returns, report the transaction
                return chain.filter(exchange.mutate().request(decorator).build()).then(Mono.fromRunnable(() -> transaction.end()));
            }));
        } else if (HttpMethod.GET.equals(httpMethod)) {
            span.injectTraceHeaders((name, value) -> {
                exchange.getRequest().getQueryParams().set(name, transaction.getId());
                LOGGER.info("APM key:{}, transactionId:{}", name, value);
            });
            return chain.filter(exchange).then(Mono.fromRunnable(() -> {
                span.end();
                transaction.end();
                LOGGER.info("APM buy complete,transactionId:{}", transaction.getId());
            }));
        } else {
            //not support other Http Method
            exchange.getResponse().setStatusCode(HttpStatus.UNSUPPORTED_MEDIA_TYPE);
            returnexchange.getResponse().setComplete(); }}Copy the code

Controller

The realization of the Controller layer adopted SpringAOP way, the benefits are for business code does not invade, high scalability, monitoring method to direct configuration @ TransactionWithRemoteParent ().

The following code is through @ TransactionWithRemoteParent () method of the Controller of the report.

@PostMapping(value = "/search", consumes = "application/json", produces = "application/json")
@TransactionWithRemoteParent(a)public String searchForm(@RequestBody String req) {
    String result = asService.helloAs(req);
    return result;
}
Copy the code

AOP implementations

@Aspect
public class ApmAspect {

    private static final Logger LOGGER = LoggerFactory.getLogger(ApmAspect.class);

    @PostConstruct
    private void init(a) {
        LOGGER.info("ApmAspect loaded");
    }


    @Pointcut(value = "@annotation(transactionWithRemoteParent)", argNames = "transactionWithRemoteParent")
    public void pointcut(TransactionWithRemoteParent transactionWithRemoteParent) {}@Around(value = "pointcut(transactionWithRemoteParent)", argNames = "joinPoint,transactionWithRemoteParent")
    public Object around(ProceedingJoinPoint joinPoint, TransactionWithRemoteParent transactionWithRemoteParent) throws Throwable {
        Transaction transaction = null;
        try {
            MethodSignature signature = (MethodSignature) joinPoint.getSignature();
            transaction = ElasticApm.startTransactionWithRemoteParent(key -> {
                String httpRequest = (String) joinPoint.getArgs()[0];
                JSONObject json = JSON.parseObject(httpRequest);
                String traceId = json.getString(key);
                LOGGER.info("Section adds sub-transaction, key={},value={}", key, traceId);
                RpcContext.getContext().setAttachment(key, traceId);
                return traceId;
            });
            transaction.setName(StringUtils.isNotBlank(transactionWithRemoteParent.name())
                    ? transactionWithRemoteParent.name() : signature.getName());
            transaction.setType(Transaction.TYPE_REQUEST);
            return joinPoint.proceed();
        } catch (Throwable throwable) {
            if(transaction ! =null) {
                transaction.captureException(throwable);
            }
            throw throwable;
        } finally {
            if(transaction ! =null) {
                LOGGER.info("Transaction:{}", transaction.getId()); transaction.end(); }}}}Copy the code

Dubbo filter

The following code is the DubboConsumer filter, specifically for handling APM. The implementation of DubboProvider is similar.

@Activate(group = "consumer")
public class DubboConsumerApmFilter implements Filter {

    private static final Logger LOGGER = LoggerFactory.getLogger(DubboConsumerApmFilter.class);

    @Override
    public Result invoke(Invoker
        invoker, Invocation invocation) throws RpcException {
        Transaction transaction = ElasticApm.startTransactionWithRemoteParent(key -> {
            String traceId = invocation.getAttachment(key);
            LOGGER.info("key={},value={}", key, traceId);
            return traceId;
        });
        try (final Scope scope = transaction.activate()) {
            String name = "consumer:" + invocation.getInvoker().getInterface().getName() + "#" + invocation.getMethodName();
            transaction.setName(name);
            transaction.setType(Transaction.TYPE_REQUEST);

            Result result = invoker.invoke(invocation);

            return result;
        } catch (Exception e) {
            transaction.captureException(e);
            throw e;
        } finally{ transaction.end(); }}}@Activate(group = "provider")
public class DubboProviderApmFilter implements Filter {
    @Override
    public Result invoke(Invoker
        invoker, Invocation invocation) throws RpcException {
        // use startTransactionWithRemoteParent to create transaction with parent, which id from prc context
        Transaction transaction = ElasticApm.startTransactionWithRemoteParent(key -> invocation.getAttachment(key));

        try (final Scope scope = transaction.activate()) {
            String name = "provider:" + invocation.getInvoker().getInterface().getName() + "#" + invocation.getMethodName();
            transaction.setName(name);
            transaction.setType(Transaction.TYPE_REQUEST);
            return invoker.invoke(invocation);
        } catch (Exception e) {
            transaction.captureException(e);
            throw e;
        } finally{ transaction.end(); }}}Copy the code

The effect

The source code

Github.com/siyuanWang/…

Reference documentation

ElasticAPM integration with Dubbo