Author: Idle fish technology – Jing Yang

background

At present, the actual production and deployment environment of Xianyu is more and more complex, which is horizontally dependent on various service disks, and vertically dependent on the operation environment is more and more complex. When service problems occur, whether the root cause of the problem can be located in the mass of data in time becomes a severe challenge to test xianyu’s service ability.

Online problems often take more than ten minutes or even longer to find the cause of the problem, so a rapid automatic diagnosis system needs to be applied, and the rapid diagnosis is based on a high-performance real-time data processing system. This real-time data processing system needs to have the following capabilities: 1. Real-time data acquisition, real-time analysis, complex calculation, and persistence of analysis results. 2, can handle a variety of data. Contains application logs, host performance monitoring indicators, and call link diagrams. High reliability. The system has no problems and data cannot be lost. 4, high performance, bottom delay. The delay of data processing is less than 3 seconds, supporting tens of millions of levels of data processing per second. This paper does not deal with the specific analysis model of automatic problem diagnosis, but only discusses the design of the whole real-time data processing link.

Input output definition

In order to understand the operation of the system, we define the overall input and output of the system as follows:

Input:

Service request logs (including Traceid, timestamp, client IP, server IP, time, return code, service name, method name)

Environment monitoring data (indicator name, IP address, timestamp, and indicator value). Such as CPU, JVM GC count, JVM GC time, and database metrics.

Output:

The root cause of a service error over a period of time. The error analysis results for each service are expressed in a directed acyclic graph. The root node is the error node being analyzed, and the leaf node is the error root node. A leaf node could be an externally dependent service error, a JVM exception, etc.).

Architecture design

In the actual system operation process, with the passage of time, log data and monitoring data is continuously generated. Each piece of data generated has its own timestamp. Transmitting this time-stamped data in real time is like water flowing through different pipes.

If the constant stream of real-time data is like running water, the process of processing data is similar to that of producing tap water:

Naturally, we divide the process of real-time data into acquisition, transmission, pretreatment, calculation and storage.

The overall system architecture design is as follows:

collect

SLS log service product developed by Alibaba (including Logtail + LogHub component) is adopted. Logtail is the collection client. The reason why WE choose Logtail is because of its excellent performance, high reliability and flexible plug-in extension mechanism.

transmission

Loghub can be understood as a data subscription components, and the function of kafka, as a data transmission channel it more stable and safer, detailed comparison article reference: yq.aliyun.com/articles/35…

pretreatment

The real-time data preprocessing part uses the Blink stream computing processing component (the open source version is called Flink, and Blink is alibaba’s internal enhanced version based on Flink). Currently commonly used real-time streaming computing open source products are Jstorm, SparkStream, Flink. Since Jstorm has no intermediate calculation state, the intermediate results required in its calculation process must depend on external storage, which will result in frequent I/O affecting its performance. SparkStream essentially simulates real-time computing with tiny batch processes that actually have some latency; Because of its excellent state management mechanism, Flink ensures the performance and real-time performance of its calculation, and provides complete SQL expression, making flow calculation easier.

Computation and persistence

After data pretreatment, call link aggregation logs and host monitoring data are finally generated, in which host monitoring data will be independently stored in TSDB timing database for subsequent statistical analysis. TSDB is very suitable for time series data storage and query because of its special storage structure design for time index data. Call link log aggregation data and provide it to CEP/GRAPH Service for diagnostic model analysis. Cep/Graph Service is an application developed by Xianyu, which realizes model analysis, complex data processing and interaction with external services, and realizes real-time aggregation of graph data with the help of RDB. Finally, the cEP/Graph Service analysis results are dumped as a graph data in real time to provide online query in Lindorm. Lindorm can be seen as an enhanced hbase that acts as a persistent store in the system.

Detailed design and performance optimization

collect

Logtail is used to collect log and indicator data. The entire data collection process is shown as follows:

It provides a very flexible plug-in mechanism, with four types of plug-ins:

  • Inputs: Input inputs to obtain data.
  • Processors, processors, processors.
  • Aggregators: aggregators that aggregate data.
  • Flushers: outputs data to the specified sink.

Because metric data (such as CPU, memory, and JVM metrics) needs to be retrieved by invoking the service interface on the local machine, the number of requests should be kept to a minimum. In Logtail, an input occupies one Goroutine. By customizing input and Processors, we get multiple metrics (such as CPU, memory, and JVM metrics) in a single input plugin with a single service request (metrics fetch interface provided by the underlying monitoring team) and format them into a JSON array object. Split multiple data in Processors to reduce I/OS and improve performance.

transmission

LogHub is used for data transmission. After logtail writes data, the data is directly consumed by Blink. Only a reasonable number of partitions is required. The number of partitions must be greater than or equal to the number of concurrent blink read tasks to avoid idle tasks in blink.

pretreatment

The pre-processing is mainly achieved by BLINK. The main design and optimization points are as follows:

Write efficient calculation process

Blink is a stateful stream computing framework, which is very suitable for real-time aggregation, join and other operations. In our application, we only need to pay attention to the invocation of relevant service links on the request with errors, so the whole log processing flow is divided into two streams: 1. The service request entry log is processed as a separate stream to screen out the data with errors in the request. 2. The call logs of other intermediate links are processed as a separate stream. Join on traceid with the above stream to implement the request data cancellation dependent on the error service.

As shown in the figure above, after a dual-stream JOIN, the output is the complete data of all the links related to the request error.

Set a reasonable state life cycle

In essence, blink caches intermediate data states through state and then performs data matching during join. If the life cycle of the state is too long, data inflation will affect performance. If the life cycle of the state is too short, some delayed data cannot be associated properly. Therefore, you need to configure the state life cycle reasonably.

Use its as statebackend, life cycle, and setting the state data unit milliseconds state. Backend. Type = its state. The backend. Its. TTL. Ms = 60000Copy the code

Example Enable MicroBatch or MiniBatch

MicroBatch and MiniBatch are MicroBatch processes, but the triggering mechanism of MicroBatch is slightly different. The principle is to cache a certain amount of data and then trigger processing to reduce access to state to significantly improve throughput and reduce the amount of output data.

Link. MiniBatch. Join. Enabled = true when using microbatch blink. Need to keep the following two miniBatch configuration miniBatch. Prevent OOM allowLatencyMs = 5000, Blink. minibatch. size=20000Copy the code

Use dynamic-rebalance instead of Rebalance for Dynamic loads

In order to ensure uniform use of Dynamic Rebalance, the blink task can write data to subpartitions with light load based on the number of buffers piled up in each subpartition. In this way, dynamic load balancing is realized. Compared with a static rebalance strategy, this rebalance strategy can improve performance when the computing power of downstream tasks is unbalanced.

Open the dynamic load task. Dynamic. Rebalance. Enabled = trueCopy the code

Custom output plug-in

After data association, the data on the unified request link should be notified to the downstream graph analysis node as a packet. The traditional way is to deliver data through message service. However, there are two disadvantages of the messaging service: 1. Its throughput is still much lower than that of an in-memory database such as RDB (about an order of magnitude). 2. Perform data association on the receiving end based on tracEID. We write data asynchronously to the RDB through a custom plug-in and set the data expiration time. <traceid, link request data json style=”box-sizing: border-box; > Data structure storage. MetaQ notifies the downstream computing service of traceid only as the message content, greatly reducing the data transmission pressure of metaQ. </traceid, link request data json>

Graph aggregation calculation

After receiving the notification from metaQ, the CEP/GRAPH computing service node will generate real-time diagnosis results based on the requested link data and the dependent environment monitoring data. The diagnosis results are simplified as follows:

Note This request is due to the downstream JVM thread pool full, but a call does not explain the root cause of the service unavailable, need to analyze the overall error situation, then need to do real-time aggregation of graph data. The aggregation design is as follows (simplified to illustrate the basic idea) : 1. First, the Zrank capability of Redis is used to assign a globally unique sorting number to each node according to the service name or IP information. 2, generated the corresponding figure for the each node in the graph nodes coding, coding formats: – for the first node: head node number | attribute timestamp | node coding – for ordinary nodes: | attribute timestamp | node code 3, because each node within a time period have the only key, so the nodes can be encoded as a key to use redis do count for each node. Simultaneously eliminates the problem of concurrent read and write. 4. It is very convenient to use the set set in Redis to overlay the edges of the graph. 5. Record the root node to restore the aggregated graph structure through traversal. The results after aggregation are roughly as follows:

This eventually generates the overall cause of service unavailability, and the root cause can be sorted by counting the leaf nodes.

earnings

After the system goes online, the delay of the whole real-time processing data link does not exceed three seconds. The time to locate the idle fish server problem has been reduced from more than ten minutes or more to less than five seconds. This greatly improves the efficiency of problem location.

Looking forward to

The current system can support idle fish ten million data processing capacity per second. The subsequent automatic problem locating service is likely to be extended to more business scenarios within Alibaba, with the consequent multiplication of data volume, thus putting forward better requirements for efficiency and cost.

Possible improvements in the future:

  • Automatically reduce or compress processed data.
  • Complex model analysis and calculation can also be completed in BLINK to reduce IO and improve performance.
  • Multi-tenant data isolation is supported.