Introduction: As an online education company based on science and technology, The big data Center serves as the basic system center. It is mainly responsible for building the company’s series warehouse and providing data information oriented to business topics to each product line. This paper mainly shares the best practice of data lake construction based on DeltaLake.
The author:
Liu Jin, head of big Data Platform Technology Department
Wang Bin – Senior architect of Big Data Platform Technology Department
Bi Yan Ali Cloud – computing platform open source big data platform technical expert
Content framework:
- Business background
- Problems & pain points
- The solution
- Offline data warehouse based on DeltaLake
- The future planning
- Thank you
I. Business background
Homework is an online education company based on technology. At present, it has tool products homework help, homework help calculate, K12 live class products homework help live class, quality education products deer programming, deer writing, deer art, as well as smart learning hardware such as meow meow machine. Homework help teaching research center, teaching center, tutoring operation Center, big data center and other business systems, continue to empower more quality education products, continue to bring better learning and use experience for users. Big data Center center, as the basic system center center, is mainly responsible for building the company’s series warehouse and providing data information oriented to business topics for each product line, such as retention rate, attendance rate, active number, etc., to improve the efficiency and quality of operation decisions.
Above is an overview of job help data center. It is mainly divided into three layers:
- The first layer is the data products and enablement layer
Data tools and products based on topic data domains support application scenarios such as business intelligence and trend analysis.
- The second layer is the global data layer
Through OneModel unified modeling, we have standardized modeling of the access data, and constructed the subject data of the business domain for different timeliness scenarios, so as to improve the use efficiency and quality of upper-layer products.
- The third layer is the data development layer
Built a series of systems and platforms to support all data development projects in the company, including data integration, task development, data quality, data services, data governance, etc.
The main content of this share is to solve the performance problems in the process of production and use of offline data warehouse (day level, hour level).
Second, problems & pain points
The offline data warehouse of job help provides data construction capability from ODS layer to ADS layer based on Hive. When ADS table is generated, it will be written into OLAP system through data integration to provide BI service for managers. In addition, DWD, DWS and ADS tables also provide offline data exploration and counting services for analysts.
With the gradual development of business and the increasing amount of corresponding data, the main problems of offline warehouse system are highlighted as follows:
- The output delay of ADS tables is getting longer and longer
As the amount of data increases, it takes longer and longer to construct the whole link from ODS layer to ADS layer. Although the core ADS table link can be solved in a short term by the mode of slanting resources, in fact, this mode is essentially the mode of losing the car and maintaining the good position. This mode cannot be replicated on a large scale, affecting the timely output of other important ADS tables. For example, for analysts, due to the delay of data tables, For T+1 tables, the worst you can do is wait until T+2.
- Demand for hourly meters is difficult to meet
Some scenarios are hourly output tables. For example, some activities require hourly feedback to adjust operational strategies in time. For this type of scenario, along with the increasing amount of data and compute cluster resources nervous, class hour table many times to ensure timeliness, and in order to improve the computing performance, often need to prepare enough resources to do in advance, especially need to be calculated hour level level data of the day, the worst case computing resources needed to expand 24 times.
- Data exploration is slow and the stability of taking numbers is poor
Data output is often used for analysts. Direct access to Hive takes dozens of minutes or even hours, which is totally unacceptable. Users often receive teasing feedback, but Presto is used to accelerate Hive table query. As a result, the queried data table cannot be too large or the logic cannot be too complex, otherwise the Presto memory will be OOM. In addition, existing UDFs and Views cannot be directly used in Presto, which greatly limits the usage scenarios of analysts.
Iii. Solutions
Problem analysis
In essence, the computing performance of the Hive layer is insufficient, whether the output of the whole link from the ODS layer to the ADS layer is slow or the number of probes for specific tables is slow. From the above scenario analysis:
- ** Link calculation is slow because: ** Hive does not support incremental updates, and mysql-binlog from the data source of the business layer contains a large amount of update information. Therefore, at the ODS layer, incremental data and historical full data need to be deleted to form new full data. DWD, DWS, and ADS follow similar principles. This process brings about a lot of double calculation of data, and also brings about a delay in data output.
- ** Causes of slow data query: ** Hive lacks necessary index data. Therefore, both the redo calculation and the query with minute delay are translated as MR-Jobs. As a result, the output of query results is slow in fast data exploration scenarios.
Project research
From the above analysis, the performance of link calculation can be improved if the problem of data incremental update of offline data warehouse can be solved, and the query delay can be reduced on the premise that the query function is not degraded.
- HBase+ORC solution
You can use HBase to update data. Setting the RowKey to the primary key and the columns to Column provides real-time writing of data. However, due to the HBase architecture, the query performance for non-primary key columns is poor. To improve the query performance, HBase tables need to be sorted by specific fields and exported to the HDFS and stored in the ORC format. However, the ORC format only supports single-column min and Max indexes. The query performance still fails to meet the requirements. And as a result of the HBase data to have been ongoing, the timing of the export is difficult to control, in the process of export data may also change, if we want to export data on 11 December 21 o ‘clock 21 points as the data table data will need to consider the version number of partitions, storage capacity, screening of computing performance factors, such as the system complexity, The HBase system is also introduced, which increases o&M costs.
- The data of lake
A data lake is actually a data format that can be integrated between mainstream computing engines (such as Flink/Spark) and data stores (such as object storage) without introducing additional services. It also supports real-time Upsert and provides multi-version support to read data of any version.
At present, data lake schemes mainly include DeltaLake, Iceberg and Hudi. We investigated these three schemes on Aliyun, and their differences and characteristics are as follows:
In addition, considering the ease of use (DeltaLake semantics clear, Ali Cloud provides full function SQL syntax support, simple to use; The latter two have a high threshold of use) and functionality (only DeltaLake supports Zorder/Dataskipping query acceleration). Considering our scenario, we finally choose DeltaLake as the data lake solution.
4. Offline data warehouse based on DeltaLake
After the introduction of DeltaLake, our offline warehouse architecture is as follows:
Firstly, Binlog is collected by Canal and then written into Kafka through our self-developed data distribution system. It should be noted in advance that our distribution system requires strict order preservation of Binlog at Table level, for reasons detailed below. Spark is then used to write data in batches to DeltaLake. Finally, we upgraded the data fetch platform and used Spark SQL to fetch data from DeltaLake.
In using DeltaLake, we need to address the following key technical points:
Stream data to batch
In service scenarios, ETL tasks of offline data warehouse are triggered according to table partition readiness. For example, tasks performed at 2021-12-31 days can be triggered only after table partition readiness at 2021-12-30 days. This scenario is easy to support on Hive systems because Hive naturally supports partitioning by date fields such as DT. However, for DeltaLake, we write data by streaming, so we need to convert the streaming data to batch data, that is, after the data is ready on a certain day, we can provide the read capability of the corresponding day partition externally.
How do you define data readiness
Streaming data is generally out of order. In the case of out of order, even if the watermark mechanism is adopted, data can only be ordered within a certain time range, but for offline data warehouse, data must be 100% reliable. If we can solve the ordering problem of data sources, the problem of data readiness can be solved much more easily: if the data is partitioned by day, then when the data of 12-31 is present, the data of 12-30 can be considered ready.
Therefore, our scheme is decomposed into two sub-problems:
- The batch data boundary is defined after the stream data is ordered
- A mechanism to ensure the order of stream data
First of all, for the former, the overall plan is as follows:
- Set the logical partition field DT of the data table and the corresponding time unit information.
- After Spark reads a batch of data, the Dt value is generated using event Time in the data based on the above table metadata. If all event Time values in the data flow are T+1, the snapshot of data version T is generated. Data is read according to the snapshot to find the corresponding data version information.
How to solve the problem of flow data out of order
Both app-log and mysql-binlog are in order for logs. Take mysql-binlog as an example, the Binlog of a single physical table must be in order. However, in actual service scenarios, the service system often uses separate databases and tables. A logical Table Table can be divided into Table1, Table2… For the ODS tables of offline data warehouse, the details and logic of MySQL sub-tables on the business side should be shielded. In this way, the problem focuses on how to solve the problem of data order in the sub-table scenario.
- Ensure the order of data written to Kafka from different tables and even different tables in different clusters. That is, the data that Spark writes to DeltaLake reads from a topic to the logical table is partitional-grained.
- Ensure the timeliness of ODS table readiness. If there is no Binlog data, ODS layer data can also be ready on time.
The original system needs to be upgraded as follows:
As shown above: The Binlog of a MySQL cluster is written to a particular kafka-topic after being collected by Canal, but since the partition is hashed according to db and Table (delete Table _* suffix), the partition is written to a particular kafka-topic. Therefore, multiple binlogs of physical tables can exist within a single partition, which is very unfriendly to write to DeltaLake. Considering compatibility with other data applications, we have added data distribution service:
- Write data from the logical table name (with the _* suffix removed) to the corresponding topic and hash using the physical table name. Ensure that the internal data of a single partition is always in order, and a single topic contains only the data of a logical table.
- An internal heartbeat table is built in MySQL cluster to monitor the delayed anomalies collected by Canal, and a certain threshold is set based on this function to judge whether the system has a problem or really no data when there is no Binlog data in the system. If it is the latter, DeltaLake is also triggered to savepoint, which in turn triggers snapshot in time to ensure that the ODS table is ready in time.
With the above scheme, we write Binlog streams to DeltaLake with a table partition ready time delay of <10mins.
Read and write Performance Optimization
Here are the performance problems we encountered in the process of using DeltaLake and the corresponding solutions.
DPP improves write performance
DeltaLake supports writing data via SparkStreamingSQL.
Because records are to be merged and deduplicated, you need to merge into writes. DeltaLake updates data in two steps:
- Locate the file to be updated. By default, read all files and Join batch incremental data in Spark to associate the file to be updated.
- Merge and re-write the files, marking the old files as deleted.
As shown in the upper left figure, DeltaLake reads the full files of the previous version by default. As a result, the write performance is very low. A merge operation cannot be completed in one Batch of Spark.
For this scenario, DeltaLake has been updated to optimize Megre into performance with partition pruning using DPP, as shown above on the right:
- The merGE-on condition is analyzed to obtain the fields corresponding to the partition fields of DeltaLake table in the source table.
- Statistics yield an enumeration list of partitioned fields.
- Transform the result of the previous step into a Filter object and apply it to further Filter and cut the list of data files.
- Read the final data file list and associate the batch source data to get the final file list to be updated.
After DPP optimization, Spark’s batch (5min granularity) processing delay is reduced from the maximum of 20mins+ to the maximum of ~3mins, which completely eliminates the overlapping delay caused by long processing time in the past.
Use Zorder to improve read performance
After solving the problem of data write performance, we encountered the problem of data read performance.
We used the same data (20 billion +) and the average latency was 10min+ with Hive, while the average latency was 11mins+ with DeltaLake. After analysis, it is found that Zorder is not used to sort the filtered columns. When Zorder is enabled, the delay is reduced to 24s and the performance is improved by nearly 25X.
Query optimization of DeltaLake table based on Zorder mainly involves two aspects:
-
Dataskipping
-
DeltaLake collects statistics on the Max /min values of each field based on the file granularity, which is used to directly filter data files.
-
Zorder
-
A data layout that rearranges data to ensure as much data locality as possible in the Zorder field.
Zorder build time optimization
Which columns are enabled with Zorder are built on demand, which takes 30mins for a normal build and up to 90mins for a data skew.
For both cases, Zorder has been optimized:
- In general, for multi-column Zorder, the data set is traversed once instead of multiple times to improve construction efficiency. Build times decreased from an average of 30mins to 20mins.
- Under data skew, the bucket where the skew column is located is distributed with hot spots, reducing the build time from an average of 90mins to 30mins.
The overall effect
After nearly half a year of development and optimization, offline data warehouse based on DeltaLake has been launched recently, focusing on improving query optimization of analysis, and also providing support for scenes with hourly full demand. The overall effect is as follows:
- Faster ready time: With the ODS replacement to DeltaLake, output time was moved up by more than 2 hours from the previous 2:00-3:00 a.m. to around 00:10 a.m.
- More extensive capacity expansion: Big data has the ability to support the hourly full scale. With DeltaLake incremental update feature, the demand for hourly full scale is realized at a low cost, avoiding the consumption of reading the full data under the traditional scheme. At present, it has been applied to some core businesses to build the hour-scale, and the timeliness guarantee has been reduced from 40mins in the past to 10mins.
- Query speed improvement: We focus on improving the efficiency of the analysts’ AD hoc query by migrating the analyst’s commonly used data position table to Deltalake and using Zorder to realize the query acceleration, which reduces the query speed from the past tens of minutes to ~3mins.
5. Future planning
With the use of DeltaLake in job help, there are still some problems to be solved:
-
Improve repair efficiency.
-
Using Hive, you can independently repair a historical partition, but you need to roll back all versions of the DeltaLake table.
-
Fully supports Hive engines.
-
At present, we use DeltaLake to solve the problems of slow Hive query and Presto to limit complex query. It provides a solution for complex query and low latency. However, GSCD, Dataskipping and other features mentioned above are not supported by Hive. Users cannot use DeltaLake like Hive.
-
Flink access is supported.
-
Our stream computing system ecosystem is mainly built around Flink. The introduction of DeltaLake and the use of Spark will increase the maintenance cost of our stream computing ecosystem.
Six, thanks
Finally, we would like to thank the EMR data Lake team of Ali Cloud for helping us solve many key problems during our data lake migration process by relying on their professional ability in DeltaLake and efficient support in the cooperation process.
The original link
This article is the original content of Aliyun and shall not be reproduced without permission.