1 the introduction

As a PaaS service, netease Yunxin needs to monitor online services in real time and sense the “heartbeat”, “pulse”, “blood pressure” and other health conditions of the service in real time. It is a very large and disordered data set to get the heartbeat buried point logs of SDK, server and other terminals through collection services. How can we effectively use these data? What the service monitoring platform needs to do is to make real-time analysis of massive data, aggregate the core indicators that represent the “heartbeat”, “pulse” and “blood pressure” of the service, and display them intuitively to relevant students. At the heart of these capabilities: real-time analysis and real-time aggregation.

In the previous article “practice of netease Cloud Communication Service Monitoring Platform”, we introduced the overall framework of netease Cloud communication service monitoring platform by focusing on four aspects: data collection, data processing, monitoring and alarm, and data application. This paper is a further detail on the aggregation index calculation logic of netease Yunxin.

Real-time aggregation based on detailed data sets produces an aggregation indicator. Spark Streaming and Flink SQL/Stream apis are commonly used in the industry. Either way, we need to write code that specifies the data source, data cleaning logic, aggregation dimensions, aggregation window sizes, aggregation operators, and so on. Such complicated logic and code, whether development, testing, or maintenance of follow-up tasks, need to invest a lot of human/material costs. What we programmers have to do is to simplify complexity and achieve great ingenuity.

This paper will explain how netease Yunxin realizes a set of general aggregation index calculation framework based on Flink’s Stream API.

2 Overall Architecture

As shown in the figure above, it is our self-developed polymerization index complete processing link based on Flink, and the modules involved include:

  • Source: Periodically loads aggregation rules and creates Kafka consumers on demand based on the aggregation rules and continues to consume data.
  • Process: includes grouping logic, window logic, aggregation logic, sequential calculation logic, etc. As you can see from the figure, we split into two in the aggregation phase. What is the purpose of doing this? What are the benefits? Anyone who has done distributed and concurrent computing has encountered a common enemy: data skew. ** In our PaaS service, the head customers will be more obvious, so the tilt is very serious. The secret of aggregation in two stages will be explained in detail below.
  • Sink: indicates the data output layer. Currently, it outputs data to Kafka and InfluxDB by default. The former is used to drive subsequent calculations (such as alarm notification), and the latter is used for data display and query services.
  • Reporter: The whole link statistics each link running status, such as input/output QPS, calculation time, consumption accumulation, late data volume, etc.

The following will introduce the design and implementation of these modules in detail.

3 source

Rule configuration

In order to facilitate the production and maintenance of aggregate indicators, we abstracted the key parameters involved in the calculation of indicators and provided a visual configuration page, as shown in the figure below. The following describes the functions of each parameter based on specific scenarios.

Rules of the load

We load the configuration periodically as the aggregation task runs. If a new Topic is detected, we create a Kafka-consumer thread to receive the upstream live data stream. Similarly, for invalid configurations, we close the consuming thread and clean up the associated reporter.

Data consumption

For aggregation metrics with the same data source, we share a single kafka-consumer. After the records are pulled and parsed, call collect() for data distribution for each aggregation metric. If the data filtering rule (configuration item 5) is not empty, data filtering must be performed before data distribution. Data that does not meet the conditions is discarded.

4 process

Overall calculation process

The core code of the Stream API based on Flink to achieve aggregate computing is as follows:

SingleOutputStreamOperator<MetricContext> aggResult = src
        .assignTimestampsAndWatermarks(new MetricWatermark())
        .keyBy(new MetricKeyBy())
        .window(new MetricTimeWindow())
        .aggregate(new MetricAggFuction());
Copy the code
  • MetricWatermark() : Obtains timestamp of input data based on the specified time field (configuration item 8) and drives the watermark of computation flows forward.
  • MetricKeyBy() : specifies the aggregation dimension, similar to groupby in MySQL. It obtains the aggregation dimension value from the data according to the group field (⑥ configuration item) and concatenates the aggregation dimension into a group key.
  • MetricTimeWindow() : Configuration item ⑧ specifies the window size for aggregate computation. If timed output is configured, we create a sliding window, otherwise we create a scrolling window.
  • MetricAggFuction() : to realize the calculation of various operators specified by configuration item (2). The realization principle of each operator will be introduced in detail below.

The second aggregation

Data skew is a problem that must be considered for the aggregation calculation with large data volume. Data skew means that the aggregation key specified by the grouping field (⑥) configured in the rule has hot spots. Our computing framework was designed to solve the problem of data skew by splitting the aggregation process into two stages:

  • Stage 1: The data is randomly scattered and pre-aggregated.
  • Stage 2: Final aggregation takes the pre-aggregation results of stage 1 as input.

Concrete implementation: If parallelism is greater than 1, generate a random number between [0, Parallelism] as randomKey. KeyBy () The key obtained according to the grouping field (⑥ configuration item) is spliced with randomKey to generate the final aggregate key, so as to realize the random data fragmentation.

Aggregation operator

As a platform product, we provide the following common aggregation operators. Because of the quadratic aggregation logic, each operator adopts the corresponding calculation strategy in the first and second stages.

operator Stage 1 polymerization Stage 2 polymerization
min/max/sum/count Directly preaggregate the input data and output the preaggregate result The results of pre-aggregation of the first stage were calculated and the final results were output
first/last Compare the timestamp of the input data, record the minimum/maximum timestamp and the corresponding value, and output the < TIMESTAMP,value> data pair Second computation is performed on the data pair <timestamp,value>, and the final first/last is output
avg Calculate the sum of the group and the number of records, and print the <sum, CNT > data pair Sum the <sum, CNT > data pairs separately and print: total sum/total cntcount
median/tp90/tp95 Calculate the distribution of input data and output the NumericHistogram Merge the input NumericHistogram, resulting in the output median/TP90 / TP95
count-distinct Output the RoaringArray that records bucket information and bitmap Merge the RoaringArray. The result of the merge operation is displayed
The count – distinct (approximate) Output the radix count object HyperLoglog Merge HyperLoglog to output a similar result of deduplication

For operators whose calculation results are affected by all the data, such as count-distinct, the general idea is to take advantage of the deduplication feature of a set, put all the statistics in a set, and finally output the size of the set in the aggregate function getResult. If the statistics volume is very large, the Set object will be too large to consume an unacceptable amount of TIME for I/O operations on the Set.

For mapReduce-like big data computing frameworks, performance bottlenecks often occur in I/O of large objects in shuffle phase, because data needs to be serialized, transmitted, and deserialized. Flink is no exception. Similar operators include Median and tp95.

To this end, special optimization is needed for these operators, the idea of optimization is to minimize the size of data objects used in the calculation process, where:

  • Median/tp90 / tp95: The hive Percentile_approx approximation algorithm is referred to, which records data distribution through the NumericHistogram (a non-isometric histogram) and then obtains the corresponding TP value by interpolation (median is TP50).
  • Count-distinct: The RoaringBitmap algorithm is adopted to mark the input samples by compressing the bitmap, and finally obtain accurate count results.
  • Count – Distinct (approximate) : The HyperLoglog algorithm is used to obtain approximate de-count results through cardinality counting. The algorithm is suitable for recasting large data sets.

post-processing

The post-processing module reprocesses the output data of the second-stage aggregation calculation, and has two main functions:

  • Composite index calculation: new composite index is obtained by combination calculation of the original statistical index. For example, to count login success rates, we can count the denominator (number of logins) and the numerator (number of successful logins) separately, then divide the numerator by the denominator to get a new composite metric. Configuration item 3 is used to configure the calculation rule for the combination indicator.
  • Calculation of relative indicators: In alarm rules, you often need to determine the relative changes of an indicator (year-on-year or quarter-on-quarter). We can conveniently calculate the year-on-year/sequential index by using Flink’s state, and the configuration item (4) is used to configure the relative index rules.

Processing of abnormal data

The abnormal data mentioned here can be divided into two categories: late data and early data.

  • Late data:
    • For severely late data (allowedLateness larger than aggregation window), it can be collected through sideOutputLateData and reported through Reporter statistics, so as to carry out visual monitoring on the monitoring page.
    • For data that is slightly late (less than the allowedLateness of the aggregate window), window recalculation is triggered. If the recalculation of stage 1 window is triggered for every late data, and the recalculation result is transmitted to the aggregation calculation of stage 2, it will lead to repeated statistics of some data. To solve the problem of double counting, we have done a special treatment in stage 1 of the aggregate Trigger: the window Trigger uses FIRE_AND_PURGE to purge data that has already been computed.
  • Incoming data: This part of data is usually caused by the incorrect clock on the data reporting end. The timestamp of these data should be computed manually to avoid affecting the watermark of the entire computation stream.

5 sink

Metrics calculated by aggregation are output by default to Kafka and the timing database InfluxDB.

  • Kafka-sink: The indicator identifier (configuration item 1) is used as kafka topic to send the aggregation result. After receiving the data flow, the downstream can further process the data flow, such as the production of alarm events.
  • Influxdb-sink: Takes indicator ids (configuration item 1) as the table names of the sequential database, and persists the aggregation results for API data query and visual report display.

6 reporter

In order to monitor the running status of various data sources and aggregation indicators in real time, we realized the full-link monitoring of aggregation computing through the combination of InfluxDB and Grafana, such as the input/output QPS, computing time, consumption accumulation and late data amount of each link.

7 conclusion

At present, the general aggregation framework bears the calculation of more than 100 indicators from different dimensions of netease Yunxin, bringing considerable benefits:

  • Efficiency improvement: the page configuration method is adopted to realize the production of aggregation indicators, and the development cycle is shortened from days to minutes. Students without experience in data development can also complete the configuration of indicators by themselves.
  • Simple maintenance and high resource utilization: only one flink-job is maintained for 100+ indicators, and the resource consumption is reduced from 300+ CU to 40CU.
  • Transparent operation process: with the help of full-link monitoring, which computing link has bottlenecks and which data source has problems can be seen at a glance.

The authors introduce

Sheng Shaoyou, senior development engineer of netease Yunxin Data platform, is engaged in data platform related work, responsible for the design and development of service monitoring platform, data application platform and quality service platform.

More technical dry goods, welcome to pay attention to [netease Smart enterprise technology +] wechat public number