Abstract: This article is shared by Alibaba technical expert Chen Yuzhao (Yu Zhao) and Alibaba development engineer Liu Dalong (Feng Li) in Flink Forward Asia 2021. The main contents include:
- Apache Hudi 101
- Flink Hudi Integration
- Flink Hudi Use Case
- Apache Hudi Roadmap
FFA 2021 Live Playback & Presentation PDF download
Apache Hudi 101
When it comes to data lakes, we all have such questions: What is a data lake? Why is data Lake hot in the past two years? In fact, data lake is not a new concept. The earliest concept of data lake was put forward in the 1980s. At that time, the definition of data lake was the original data layer, which can store various structured, semi-structured and even unstructured data. Many scenarios, such as machine learning and real-time analysis, determine the data Schema at query time.
With low cost and high flexibility, lake storage is suitable for centralized storage in query scenarios. With the rise of cloud services in recent years, especially the maturity of object storage, more and more enterprises choose to build storage services on the cloud. The storage separation architecture of data lake is very suitable for the current cloud service architecture. It provides basic ACID transactions through snapshot isolation and supports docking with multiple analysis engines to adapt to different query scenarios. It can be said that lake storage has great advantages in cost and openness.
The current lake storage has begun to assume the function of data warehouse, through docking with the computing engine to achieve the integration of lake warehouse architecture. Lake storage is a table format that encapsulates the high-level semantics of table on the basis of the original data format. Hudi started to put the data lake into practice in 2016 to solve the data update problem on the file system under the big data scenario. The TABLE format of LSM of Hudi is unique in the current lake format, which is friendly to near-real-time update and has relatively perfect semantics.
Table format is the basic attribute of the three popular data lake formats, while Hudi has been evolving towards the platform since the beginning of the project, with relatively perfect data governance and Table service. For example, users can optimize the layout of files concurrently when writing. The metadata table greatly optimizes the file search efficiency on the query side.
The following are some basic Hudi concepts.
Timeline Service is the core abstraction of Hudi transaction layer. All Hudi data operations are carried out around the Timeline Service. Each operation is bound to a specific timestamp through the Instant abstraction. A series of instant forms the Timeline service, and each instance records the corresponding action and status. Through the Timeline Service, Hudi can know the status of the current table operation. Through the abstraction of a set of file system views and the Timeline service, Hudi can expose the file layout view under a specific timestamp to the current reader and writer of the table.
File group is the core abstraction of Hudi in the file layout layer. Each file group is equivalent to a bucket, divided by the file size. Each write behavior of Hudi will generate a new version, and one version will be abstracted into a file slice. File Slice maintains the corresponding version of the data file internally. When a file group is written to a specified file size, a new file group is switched.
Hudi writes to file slice can be abstracted into two semantics, copy on write and merge on read.
Copy on Write Writes full data each time. The new data is merged with the data of the previous file slice, and a new file slice is written to generate a new bucket file.
Merge on Read, however, is more complex. The semantics of merge on read are appending, that is, incremental data is written at a time, so no new file slice is written. It will first attempt to append the previous file slice, and only after the written file slice is included in the compression plan will a new file slice be cut.
Flink Hudi Integration
Flink Hudi’s write pipeline consists of several operators. The first operator is responsible for converting rowData from the table layer into HudiRecord, Hudi’s message format. Then a Bucket Assigner assigns HudiRecord to a specific file group, and the records that are assigned to the file group are sent to the Writer operator to write to the actual file. Finally, there is a coordinator responsible for the table Service scheduling of the Hudi table layer and the initiation and submission of new transactions. In addition, there are some cleanup roles in the background that clean up older versions of the data.
In the current design, each bucket Assign task holds a bucket Assigner, which maintains its own set of file groups. When writing new or non-update INSERT data, the bucket Assign Task scans the file view and preferentially writes the new batch of data to the file group that is judged to be a small bucket.
Task1 is written to file group1 and file group2 first, but not to file group3. This is because file group3 already has 100MB of data, so not writing to buckets that are close to the target threshold can avoid excessive write magnification. Task2 directly writes a new file group, and does not append larger file groups to existing ones.
Next, the state switching mechanism of the Flink Hudi write process is introduced. When a job starts, the coordinator attempts to create a new table on the file system. If the current table does not exist, the coordinator writes meta information to the file directory to create a table. After receiving the initial meta information of all tasks, the Coordinator starts a new transaction. After the Write Task sees the transaction initiated, it unlocks the flush behavior of the current data.
Write Task first accumulates a cache of data. There are two types of flush strategies: one is to flush out the current buffer when it reaches a specified size. When the upstream checkpoint barrier reaches and a snapshot is required, all data in the memory is flushed to disks. After each flush data, the meta information is sent to the coordinator. After a Coordinator receives a Checkpoint success event, it submits the transaction and initiates the next transaction. When the Writer Task sees the new transaction, it unlocks the next transaction. Thus, the entire write process is strung together.
Flink Hudi Write provides very rich Write scenarios. Currently supports writes to log data types, that is, non-updated data types, as well as small file merging. In addition, Hudi’s core write scenarios such as update streams and CDC data are also supported by Hudi. Flink Hudi also supports efficient batch import of historical data. Bucket Insert mode can efficiently import offline data, such as Hive data or database data, into Hudi format by batch query. In addition, Flink Hudi also provides full and incremental index loading, allowing users to efficiently import batch data into lake format at one time, and then implement full and incremental data import through the docking stream writer.
The Flink Hudi Read side also supports a very rich query view, currently the main support for full read, historical range of incremental read and streaming read.
Here is an example of writing Hudi using Flink SQL. Hudi supports a wide range of use cases and simplifies the parameters that users need to configure. By simply configuring the table PATH, concurrency, and Operation Type, users can easily write upstream data to Hudi format.
Iii. Flink Hudi Use Case
Here are the classic application scenarios of Flink Hudi.
The first classic scenario is a DB import into a data lake. There are currently two ways for DB data to be imported into the data lake: full and incremental data can be imported into Hudi format at once through CDC Connector; You can also import data into Hudi format through Flink’s CDC format by consuming CDC Changelog on Kafka.
The second classic scenario is ETL for stream computing (near real time OLAP analysis). Compute simple ETLs by interconnecting upstream streams, such as dual-stream Join or dual-stream Join with an AGG, and write the change stream directly into Hudi format. The downstream READ terminal can then connect to traditional olAP engines such as Presto and Spark for end-to-end near-real-time queries.
The third classic scenario is somewhat similar to the second in that Hudi supports native Changelog, that is, saving row-level changes in Flink calculations. Based on this capability, end-to-end near-real-time ETL production can be achieved by streaming read consuming changes.
In the future, the two major versions of the community will focus on streaming reading and streaming writing, and will strengthen the semantics of streaming reading. In addition, catalog and metadata will do self-management; We will also release a native Trino Connector support in the near future that will replace the current way of reading Hive for efficiency.
Apache Hudi Roadmap
Here is a MySql to Hudi 1000 table into the lake demo.
First of all, we prepared two libraries here, benchmark1 and benchmark2. Benchmark1 has 100 tables and benchmark2 has 1000 tables. Since kilobytes into the lake strongly depend on the catalog, we first create the catalog, MySql catalog for the data source, and Hudi catalog for the target. The MySql Catalog is used to obtain information about all source tables, including table structure, table data, and so on. The Hudi Catalog is used to create the target.
After executing the two SQL statements, the two catalogs are created successfully.
Next go to the job development page to create a thousand tables into the lake job. The first syntax is create Database as Database, which synchronizes all table structures and table data from MySql Benchmark1 library to Hudi CDS Demo library in one key. The relationship between tables is one-to-one mapping. MySql > create table as table (sbtest); The regular expression table is synchronized to the CTAS_DEMA table under DB1 of Hudi. The mapping is many-to-one, and the database and table are merged.
Then we run and go online. Then we go to the job operations page to start the job. We can see that the configuration information has been updated, indicating that the job is online again. Then click the Start button to start the job. You can then go to the Job overview page to view the status information related to the job.
The diagram above shows the topology of the job, which is very complex, with 1100 source tables and 101 target tables. Merge all tables into a single node, so you can only pull once in the incremental binlog pull phase to reduce the load on MySql.
Then refresh the OSS page, you can see that there is an additional cdAS_demo path, enter the subtest1 path, you can see that metadata has been written, indicating that data is in the process of writing.
Go to the job development page and write a simple SQL query on a table to verify that the data is actually being written. Execute the above SQL statement, you can see that the data can be queried, the data is consistent with the inserted data.
By using the metadata capability provided by catalog, combined with CDS and CTS syntax, we can easily realize the data entry of thousands of tables into the lake through a few lines of simple SQL, greatly simplifying the process of data entry into the lake and reducing the workload of development operation and maintenance.
FFA 2021 Live Playback & Presentation PDF download