Brief introduction: Flink + Hudi data lake technology scheme selected by station B, and optimization made for it.
The author of this paper, Yu Zhaojing, introduces why station B chooses Flink + Hudi data lake technology scheme and the optimization for it. The main contents are:
Traditional off-line counting warehouse pain points
Technical plan of data lake
Hudi mission stability assurance
Data into the lake practice
Incremental data lake platform revenue
Contribution to the community
Future development and thinking
First, the traditional off-line warehouse pain point
1. The pain points
The warehousing process of storehouse number of station B is roughly as follows:
The following core pain points arise from this architecture:
- After large-scale data is landed in the HDFS, it can be queried and processed only after partition archiving at dawn.
- RDS data synchronization with a large amount of data can only be processed after partition archiving in the early morning, and the data of the previous day can only be generated by sorting, de-duplication and join.
- Data can be read only by partition granularity, and a large number of redundant I/OS may occur in split traffic scenarios.
To sum up:
- Scheduling start late;
- Slow merging speed;
- Repeat read more.
2. Think about pain points
- Scheduling start late Since Flink ODS is quasi-real-time writing, there is a clear concept of file delta, you can use the same step based on file delta, cleaning, dimension, shunt and other logic by the way of delta processing, so that you can process data when ODS partition is not archived. Theoretically, data latency depends only on the processing time of the last batch of files.
- Since read has been able to do increment, then merge can also do increment, you can complete the combined increment through the capacity of the data lake.
- Repeat read multiple approach: The main reason for the repeat read is that the granularity of the partition is too coarse, only accurate to the hour/day level. We need to try some more fine-grained Data organization schemes. Data Skipping can be set to the field level so that efficient Data query can be made.
3. Solution: Magneto – Hudi based incremental data lake platform
Here is the warehousing process built based on Magneto:
- Flow uses Flow Flow to unify offline and real-time ETL Pipline
- Organizer data reorganization speeds up queries that support compaction of incremental data
- The Engine computing layer uses Flink and the storage layer uses Hudi
- Metadata refining Table computing SQL logic standardized Table Format computing paradigm
2. Technical plan of data Lake
1. The choice between Iceberg and Hudi
1.1 Comparison of technical details
1.2 Comparison of community activity
The statistical deadline is 2021-08-09
1.3 summarize
It can be roughly divided into the following major latitudes for comparison:
- Iceberg’s main support scheme from the start has a lot of optimizations for this scenario. Hudi supports the Appned mode in version 0.9, and is currently close to Iceberg in most scenarios. The current version 0.10 is still continuously optimized, and the performance is very close to Iceberg.
- Support for Upsert The primary support scheme for Hudi from its inception has a clear advantage over Iceberg in terms of performance and file count, and the process and logic of Compaction are all highly abstract interfaces. Iceberg’s support for Upsert starts late, and community programs still lag behind Hudi in performance, small files and other aspects.
- Compared with Iceberg community, Hudi community is obviously more active. Thanks to the community activity, Hudi has a certain gap with Iceberg in the richness of functions.
Overall, we chose Hudi as our data lake component and continued to optimize the functionality we needed on it (better Flink integration, Clustering support, etc.)
2. Select Flink + Hudi as the write mode
We chose Flink + Hudi to integrate Hudi for three main reasons:
- We partially maintain the Flink engine ourselves, which supports real-time computing across the company, and it’s not cost wise to maintain two computing engines at the same time, especially since we also made a lot of internal changes to our own Version of Spark.
- Two Index solutions are available for Spark + Hudi integration, but both have disadvantages: Bloom Index: When using Bloom Index, Spark lists all files for each task and reads the Bloom filter data written in the footer, which causes terrible pressure on HDFS. Hbase Index: This method can achieve O(1) Index finding, but external dependencies need to be introduced, which makes the whole scheme more heavy.
- We need to interface with the framework of Flink delta processing.
3. Optimization of Flink + Hudi integration
3.1 Hudi version 0.8 integrates the Flink solution
In view of the problems exposed by Hudi 0.8 integration, B station and the community cooperated to optimize and improve.
3.2 Bootstrap State Cold boot
** Supports starting Flink task writes in existing Hudi tables, thus making it possible to switch from Spark on Hudi to Flink on Hudi
The original plan:
** Problem: ** Each Task processes the full amount of data and then selects the HoodieKey belonging to the current Task to store in the state optimization.
- When each Bootstrap Operator is initialized, BaseFile and logFile related to the fileId of the current Task are loaded.
- Assemble the recordkeys from BaseFile and logFile into hoodieKeys and send them to BucketAssignFunction as Key By. HoodieKey is then stored as an index in the state of BucketAssignFunction.
** Effect: ** Extensibility of index loading is achieved by pulling the Bootstrap function out of a single Operator, increasing loading speed by N times (depending on concurrency).
3.3 Checkpoint Consistency Optimization
** Background: ** There are data consistency issues in extreme cases in Hudi version 0.8 of StreamWriteFunction.
The original plan:
** Problem: **CheckpointComplete is not in the CK life cycle, there is a CK success but instant did not commit situation, resulting in data loss.
Optimization scheme:
3.4 Append mode support and optimization
** Background: **Append mode is used to support data sets that do not require updates and can greatly improve write efficiency by eliminating unnecessary processing such as indexes and merges in the process.
Major modifications:
- FlushBucket can write a new file every time to avoid the occurrence of read and write magnification.
- Add a parameter to disable the speed limiting mechanism inside BoundedInMemeoryQueue. In Flink Append mode, set the Queue size to the same size as the Bucket buffer.
- Create a custom Compaction plan for each small file generated by the CK.
- After the above development and optimization, the performance in the pure Insert scenario can reach 5 times of the original COW.
Iii. Hudi mission stability guarantee
1. Hudi integrates Flink Metrics
By reporting metrics at key nodes, you can clearly understand the operation of the entire task:
2. Verify data in the system
3. Verify data outside the system
Iv. Practice of data entering into lake
1. CDC data into the lake
1.1 TiDB into the lake
Because all kinds of open source schemes can not directly support TiDB data export, the direct use of Select will affect the stability of the data library, so the full + increment mode is broken down:
- Start ti-CDC and write TIDB CDC data into the corresponding Kafka topic;
- Modify part of the source code by using the Dumpling component provided by TiDB to directly write HDFS.
- Start Flink to write full data to Hudi by Bulk Insert.
- CDC data for consumption increments is written to Hudi by way of Flink MOR.
1.2 MySQL Into the lake Scheme
MySQL’s solution to the lake is to directly use the open source Flink-CDC to write full and incremental data to Kafka topic via a Flink task:
- Start the Flink-CDC task and import the full data and CDC data into Kafka Topic.
- The Flink Batch task was started to read full data and write Hudi data using Bulk Insert.
- Switch to the Flink Streaming task to write incremental CDC data to Hudi via MOR.
2. Log data is incremented to the lake
- Implement HDFSStreamingSource and ReaderOperator, incremental synchronization of ODS data files, and by writing ODS partition index information, reduce the list requests to HDFS;
- Supports transform SQL configuration, allowing users to customize logic transformation, including but not limited to dimension table JOIN, user-defined UDF, and streaming by field.
- Implement Flink on Hudi Append mode, greatly improve the data write rate without merging.
V. Incremental revenue of data lake platform
- The time efficiency of data synchronization is greatly improved by Flink incremental synchronization, and the partition ready time is advanced from 2:00~5:00 to 00:30 minutes.
- The storage engine uses Hudi to provide multiple query methods based on COW and MOR, so that users can select appropriate query methods based on their own application scenarios instead of waiting for partitioning archiving.
- An automatic HUDi-based Compaction allows users to query Hive as if it were a snapshot of MySQL instead of a T+1 Binlog merge.
- This saves resources. You only need to execute the stream tasks that need to be queried repeatedly once, saving about 18000 cores.
6. Community contribution
The above optimization has been incorporated into Hudi community, and STATION B will further strengthen Hudi construction and grow together with the community in the future.
Partial core PR
Issues.apache.org/jira/projec…
Issues.apache.org/jira/projec…
Issues.apache.org/jira/projec…
Issues.apache.org/jira/projec…
Issues.apache.org/jira/projec…
Issues.apache.org/jira/projec…
Issues.apache.org/jira/projec…
Vii. Future development and thinking
- The platform supports streaming and batch integration, unified real-time and offline logic;
- Promote the quantification of data warehouse increase, and achieve the whole process of Hudi ODS -> Flink -> Hudi DW -> Flink -> Hudi ADS;
- The Clustering of Hudi is supported on Flink, which reflects Hudi’s advantages in data organization and explores the performance of z-Order and other accelerated multidimensional queries.
- Inline clustering is supported.
The original link
This article is the original content of Aliyun and shall not be reproduced without permission.