Huohu Engine’s data quality platform was honed after years of serving Bytedance Toutiao, Tiktok and other businesses. In the face of complex data quality scenarios of different product lines such as Toutiao and Douyin, how can the data quality platform meet diverse needs?

This paper introduces how the Volcano Engine data quality platform can bridge the conflict between data quality verification and computation in big data scenarios, and how the data quality platform can meet the requirements of data quality monitoring in streaming batch with a set of architecture framework.

What is data quality

Broadly speaking, data quality is defined as the extent to which data meets the requirements of a set of inherent characteristics (quality dimensions). The industry generally has six dimensions:

  • Integrity: Whether the records and information of the index data are complete or not. Data loss mainly includes the loss of records and the loss of certain field information in records, both of which will cause inaccurate statistical results. Therefore, integrity is the most basic guarantee of data quality. When doing monitoring, we need to consider two aspects: whether the number of data is less; Whether the values of certain fields are missing. Integrity monitoring is usually performed at the log level. Data integrity verification is performed during data access.
  • Accuracy: Whether the information and data recorded in the index data are accurate, and whether there are anomalies or errors. General accuracy monitoring mainly focuses on the monitoring of business result data, such as daily activity, income and other data is normal.
  • Consistency: It refers to whether the results of the same indicator are consistent in different places. Data inconsistency usually occurs when the data system reaches a certain complexity, and the same index will be calculated in multiple places. Due to the difference of calculation caliber or developers, different results of the same index are easy to occur.
  • Timeliness: After ensuring the integrity, accuracy and consistency of the data, the next step is to ensure that the data can be produced in a timely manner so that the value of the data can be reflected. Timeliness is easy to understand, mainly refers to whether the speed of data calculation is fast enough, which can be reflected in the data quality monitoring whether the monitoring result data is calculated before the specified time point.
  • Normative: Whether the data is stored according to the required rules, such as mailbox verification, IP address verification, and phone format verification, which has certain semantic significance.
  • Uniqueness: Whether the index data is repeated, such as the unique value of a field, the duplicate value of a field, etc.

We have some processes and specifications for data quality and have developed a data quality platform for some of the above dimensions, focusing on data quality and its production links.

The figure above shows what the data quality platform can provide in the data development process:

  • Data probe: You can view data details and distribution according to various dimensions.
  • Data comparison: Developers may often find that the on-line table is inconsistent with the test table, so we provide data comparison function during the task launching process.
  • Task monitoring: monitoring online data, providing alarm and fusing functions.

The most representative function of the data quality platform is to repeatedly detect the primary key of Hive table data generated by the data development platform, and to alarm if there is duplication.

The most useful scenario for data quality monitoring is to prevent data problems from spreading downstream. For example, a data task produces a Hive table that might synchronize some information to Hive MetaStore (HMS). The master-slave architecture of HMS may have some latency. If there is a problem with HMS, the downstream task may read dirty data. If we use data quality monitoring, we can detect the problem in time and prevent the downstream task from running.

Data quality Challenges

What are our current data quality challenges? You can look at a couple of user cases.

User Story 1

The number of M level logs per second in a commercialized system of flow-level products; You want to monitor log latency at the second level, null values for key fields, and T+1 to detect log volatility.

User Story 2

An internal service system that stores ES logs; You want to check the log fluctuation of the previous period every 5 minutes.

User Story 3

For an internal indicator platform, business data is periodically synchronized from Hive to ClickHouse; You want to check whether the Hive and ClickHouse metrics are consistent after each synchronization task.

From the above introduction, you should also have a general idea of the current data quality needs to be addressed. Some students may say, “I have done data quality platform, and the problem is not complicated. In a word, you can calculate various kinds of data and compare the calculated threshold. Generally, you can directly rely on Spark engine or Hive engine to calculate. Indeed, that’s actually what our data quality looks like from the very beginning. So why did it evolve this way, and what are some of the problems we face?

The first is the complexity of the scene requirements:

  1. Offline monitoring is not much to say, but it is familiar, mainly monitoring data quality of different storage, such as Hive or ClickHouse.
  2. Bytedance’s internal advertising system has high requirements on timeliness and accuracy. According to an advertising student, if it takes 10 minutes to do a test with the micro-batch system, the online loss may be millions or even tens of millions. Therefore, students of advertising system have relatively high requirements for real-time performance.
  3. The other is streaming delay monitoring in complex topologies.
  4. Finally, there is microbatch, which refers to timed scheduling over a period of time. Some Kafka import ES streaming scenarios that need to be compared every few minutes to the previous cycle.

In addition, byteDance’s various products produce a huge amount of log data, and we need to meet people’s demand for quality control with limited resources.

Faced with these challenges, what are our solutions?

Stream batch data quality solution

Product Functional architecture

The Volcano Engine Stream Batch data quality solution has four major features:

  • Offline data quality monitoring: applies to batch and microbatch monitoring scenarios, supports multiple data sources, such as Hive, ClickHouse, and ES, and monitors multiple dimensions, such as fields and uniqueness, using SQL user-defined dimension aggregation.
  • Streaming data quality monitoring: Supports streaming data sources such as Kafka and BMQ.
  • Data exploration: Solves the problem of suspicious data content before data development and supports Hive data sources.
  • Data comparison: Resolves data consistency between old and new tables and supports Hive/Hive SQL data sources.

System architecture

The figure above is the system architecture diagram of the data quality platform, which is mainly divided into five parts:

  • Scheduler: External scheduler triggers offline monitoring. There are two main types:
    • External API call task;
    • Scheduled scheduling, using callJob to invoke data.
  • Backend: back-end service, partial service layer, handling business logic. Main responsibilities:
    • Quality platform and external interaction, all API responses through this layer;
    • Task submission: Rules configured on the quality platform are stored in the service storage. After Scheduler is invoked, Backend submits tasks based on parameters configured on the task.
    • Obtain and judge the results of quality control, and then interact with external systems to send alerts to users as needed.
  • Executor: A task execution module at the core of the platform that integrates engines such as data exploration using the OLAP engine. In the quality control part, Griffin’s Measure was used for data statistics.
  • Monitor: is a relatively independent module, which mainly carries out the flow of status service and provides repeated alarm and other functions.
  • Alert Center: The quality platform is highly dependent on this platform. It is an external alarm service that receives various alarm events.

Offline data detection process

Let’s take a look at the offline data detection process.

The execution process of offline data monitoring, exploration and comparison is consistent, which is mainly divided into four steps:

  1. Monitoring trigger: Schedules the system to invoke the quality module API.
  2. Job submission: Backend sends Spark jobs to Yarn in Cluster mode.
  3. Result sending: After a job is complete (success or failure), the Driver synchronizes the result to Backend.
  4. Message triggering: Backend triggers actions (such as alarms and message prompts) based on results.

We summarize the advantages of a data quality platform:

  • Scheduling system low coupling: The data quality platform is not strongly bound to the scheduling system, and can generally use the API of the business system to achieve mutual call.
  • Efficient event triggering and strong horizontal expansion capability: Backend is a stateless instance service. If there are many quality monitoring service systems, Backend can be deployed in horizontal expansion mode to receive requests and submit jobs.
  • No Quota limits: The platform itself does not have a separate resource queue that is required to maintain data quality monitoring, but rather allows users to monitor resources with their own resources. This turns the Quota issue into a user resource issue.

Of course, no tool is perfect, and data quality platforms still need to be improved:

  • Non-cpu-intensive queries are heavy: The entire platform is designed to fulfill the requirements of offline scenarios in the form of task submission. However, it turns out that a Spark job is still started for jobs that do not need to start Spark, such as an ES SQL query, which is very heavy.
  • Low stability when relying on Yarn for scheduling: Tasks on the platform are insufficient or crowded, and the tasks run or are invoked slowly.

Stream monitoring execution

For the monitoring of streaming data, we choose Flink engine, because streaming data is different from offline data, which cannot be acquired cheaply by means of snapshot. So we rely on some external timing database plus a rules engine to demonstrate the monitoring of the data.

The flow of streaming data monitoring on the platform is as follows:

  1. According to the rule definition, create Flink job;
  2. Register Bosun alarm events according to alarm conditions;
  3. Flink job consumption Kafka data, calculate monitoring indicators write Metrics;
  4. Bosun based on Metrics timing data, timing detection, trigger alarm;
  5. Backend receives alarm callbacks and processes alarm sending logic.

The following focuses on the implementation of the two modules.

Executor implementation

Executor is a Spark Application modified from the Apache Griffin Measure module. Features include:

  • Adaptive data source
  • Data is converted to DataFrame
  • Rules are translated into SQL operations
  • The calculation results

Executor selection is based on the following considerations:

  • Scalability should be strong enough to adapt to different data sources, such as Hive, MySQL, etc
  • Computational performance is strong
  • The supported monitoring types must be sufficient

Considering the above information, we chose Apache Griffin’s Measure module as Executor. It is based on Spark, can adapt to different data sources, and has a series of extensions to DSL. Based on the platform design, we need to interact with Backend and send back data. In fact, Griffin Measure itself supports some basic data quality monitoring, such as duplicate value detection, custom SQL, and so on. Here are some highlights of our changes to the Measure module:

  • Transform the data source and Sink to enable it to access the remote API through HTTP;
  • Some functions are enhanced and modified. For example, regular expressions are supported.
  • The streaming monitoring solution is optimized by switching from Spark Engine to Flink Engine. Measure is part of the Spark ecosystem and can only be monitored using the Spark Engine or the microbatch simulation stream. Bytedance has some Flink capabilities inside, and Flink handles streaming data much better than microbatch, so we made this change.

Monitor implementation

The Monitor module is mainly used to realize the functions of failure alarm retry and repeated alarm, triggering corresponding events (repeated alarm, failure retry, etc.) according to the event type. Because all the business data is stored in MySQL, the repeated alarm of the Monitor before the platform is relatively simple, that is, the alarm instance is pulled from MySQL through polling directly, and then the alarm is reported through repeated submission.

With more and more monitoring rules, the library will be under great pressure, and Monitor’s scanning also encounters some bottlenecks. Therefore, we upgrade Monitor’s technical architecture, including:

  • Stateful service. The master node provides services externally. HA is ensured in active/standby mode
  • Receiving Backend events: Monitoring fails or alarms are reported
  • Memory timing queue, event triggering mechanism

Best practices

Having described some implementations of the data quality platform, here are some of our best practices for both data volume and resources.

Table row number information – HMS is preferred

In the internal offline monitoring, the number of table rows accounts for a large proportion. At least 50% of the offline rules are related to the number of table rows. For the number of rows in the table, we used Spark and Select Count* to submit the job, which consumed a lot of resources.

Then we optimized it a little bit. During the task submission process, the underlying engine writes the number of table rows in the corresponding partition information during the output of the table. In this way, we can directly obtain the number of table rows from the HMS partition, avoiding the Spark task submission.

The optimized effect is very obvious. At present, for the monitoring of table rows, the number of rows obtained by HMS accounts for about 90%, and the average running time of HMS row number monitoring is at the level of seconds.

Note: This function needs to be supported by underlying services. For example, Spark needs to write the information stored in the local metric to the HMS, and other data transmission systems also need to support this function.

Offline Monitoring optimization

This part is based on Griffin’s Measure, which has abundant functions, so we cut it to save time. Major tailoring and optimizations include:

  • Cut out some abnormal data collection functions;
  • Optimize non-essential join processes.

In addition, we also optimized the execution parameters of offline monitoring, mainly including:

  • Add different parameters (such as shuffle to HDFS) based on the monitoring type.
  • Based on the monitoring features, the default parameters (such as vcore) are optimized.

For example, if a user writes SQL to join data, the execution engine can analyze the execution plan. For join operations, shuffle can be very large, in which case we turn on some Spark parameters by default.

The table size is predicted based on the number of rows in the table. If the table size is judged to be large, vcore and memory are tuned by default. All of these optimizations have resulted in improved performance, with the average running time for all types of monitoring currently on the platform reduced by more than 10%.

Introduce the OLAP engine

For many data tables and business tables (except log tables) on the platform, the amount of data monitored by the table at the upper level of the warehouse is not very large, which is suitable for OLAP queries.

In this case we introduced Presto in the data probe scenario. In this scenario, Spark was used to perform probe before. After presTO was introduced, the fast FAIL mechanism was adopted to submit Spark jobs from the fallback of large data volume and complex calculation. The median probe time was shortened from 7min to less than 40s.

Streaming monitoring supports sampling & single Topic multiple Rule optimization

Kafka data sampling

The problems of general streaming data are universal problems and can be found through data sampling. Therefore, we developed the function of data sampling to reduce the consumption of data resources. Flink Kafka Connector supports sampling and can directly manipulate the Offset of Kafka topic for sampling. For example, we sample according to the proportion of 1%, and we only need ** machines to support W partition topics.

Optimization of single Topic and multiple rules

At the beginning, we defined a Rule for a Topic and then started a Flink task to consume and execute the Rule. Later, we found that some key data needed to be monitored in multiple dimensions, that is, to define multiple dimensions of the Rule. It was very resource-consuming to assign tasks to each Rule. Therefore, we took advantage of the fact that the monitoring was not CPU-intensive, multiplexed the read part, and executed multiple rules in a single slot. Single consumption at the Topic level, executing all related rules in a single task.

Future Evolution

This paper introduces the implementation and best practices of the data quality platform, and finally discusses the future evolution of the platform.

  • Unified underlying engine and integrated streaming and batch: Currently, most offline tasks of the platform are completed based on Spark. Streaming data is processed by Flink, and PRESTO is introduced into OLAP engine, resulting in high operation and maintenance costs of this system architecture. We see that Flink’s current Presto capability and Flinkbatch capability are also constantly developing, so we will try to cut some tasks in the future to achieve a unified engine in a real sense.
  • Intelligence: Data driven by the introduction of algorithms. Consider introducing ML method to assist threshold selection or intelligent alarm to automatically recommend quality rules according to data level. To name a few examples, we can solve holiday traffic peaks and common hard rule thresholds with intelligent volatility monitoring based on timing algorithms.
  • Convenience: OLAP provides significant performance improvements, but we are currently only using data exploration. The OLAP engine can then be applied to quality inspection, data exploration, data comparison applications and data development processes.
  • Optimization: For example, run multiple monitors simultaneously through a single Job, and combine monitoring and data exploration. We are now trying to combine rule generation and data exploration of data quality to achieve a corresponding relationship between wySIWYG data and rules.

Q&A

Q: The troubleshooting of data quality problems is often very time costly. Do you do any work on attribution analysis of data quality problems?

A: This problem is A very core pain point. Here we can introduce our current idea: data dringing for those who cooperate with bytedance algorithm, that is, data exploration for each table of the data link. If quality issues are found, find the fields upstream of the data by using some kind of kinship and field relationship. At present, we are still doing this partial probe + process to understand the upstream data as soon as possible, and there is no progress in attribution analysis.

Q: How does the data quality loop work: for example, who solves data quality problems? How is data quality measured?

A: Who will solve the data quality problem? Who is looking at data quality, who is pushing, who is developing the data, who is addressing the data quality issues. It’s a question of collaboration.

How do you measure data quality? We have some governable metrics internally, such as alarm volume, alarm rate of any core, etc.

Q: How to ensure end-to-end data consistency?

A: End-to-end data consistency can not be solved by A single tool. Some solutions may be needed, such as data reported from the end, data verification combined with the buried point system, and confirmation of data accuracy at the time of release. However, I think end-to-end data consistency is still insufficient in the whole industry. If there is a problem on the business end, it is difficult to troubleshoot. If each layer of the data link is monitored, troubleshooting may be relatively simple, but this approach is costly.