Abstract: This article, shared by Alibaba’s Chen Yuzhao, introduces the latest version of Flink’s integration with Hudi and a quick start practice guide. The contents include:
- background
- Environment to prepare
- Batch mode reads and writes
- Streaming reading
- conclusion
The background,
Apache Hudi is one of the most popular Data Lake solutions. Data Lake Analytics[1] integrates the efficient Data MERGE (UPDATE/DELETE) scenario of Hudi services. AWS pre-installs Apache Hudi in EMR services [2], which provides users with efficient record-level updates/deletes and efficient data query management; Uber [3] has been running Apache Hudi service stably for more than 4 years, providing low latency database synchronization and efficient query [4]. Since its launch in August 2016, the storage scale of the data lake has exceeded 100PB[5].
Apache Flink, as the most popular streaming computing framework, has natural advantages in streaming computing scenes. At present, Flink community is actively embracing Hudi community, giving full play to its own streaming write/read advantages, and also supporting batch read and write.
Hudi and Fink did a lot of integration work in version 0.8.0 [6]. Core functions include:
- Implemented new Flink Streaming Writer
- Batch and Streaming mode readers are supported
- Support Flink SQL API
Flink Streaming Writer realizes efficient index scheme through state, and the excellent design of Hudi on UPDATE/DELETE makes Flink Hudi the most potential CDC data entry scheme at present. This will be covered in a future article.
This paper uses Flink SQL Client to simply demonstrate the implementation of Hudi table operations through Flink SQL API, including batch mode read and write and streaming mode read.
Two, environmental preparation
Flink Sql Client[7] is used as a demonstration tool in this paper. Sql CLI can perform Sql interactive operations more conveniently.
Step 1: Download the Flink Jar
Hudi integrates with version 1.11 of Flink. You can refer here [8] to set up the Flink environment. The Hudi-Flink-bundle jar is an Uber JAR that integrates flink-related jars and is currently recommended to be compiled in Scala 2.11.
Step 2: Set up the Flink cluster
Start a standalone Flink cluster. Before starting, it is recommended to set Flink’s cluster configuration as follows:
- In $FLINK_HOME/conf/flink – the conf. Add in yaml configuration items taskmanager. NumberOfTaskSlots: 4
- Set the entry localhost to 4 lines in $FLINK_HOME/conf/workers, where the number of lines represents the number of locally started workers
Starting a cluster:
# HADOOP_HOME is your hadoop root directory after unpack the binary package.
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
# Start the flink standalone cluster
./bin/start-cluster.sh
Copy the code
Step 3: Start the Flink SQL Client
The BUNDLE JAR for Hudi should be loaded into the CLASSPATH when the Sql Client starts. You can manually compile jar packages in the path Hdi-source-dir /packaging/ Hdi-flink-bundle or download them from Apache Official Repository [9].
Start the SQL CLI:
# HADOOP_HOME is your hadoop root directory after unpack the binary package. export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath` ./bin/sql-client.sh embedded -j ... / hudi - flink - bundle_2. 1? -*.*.*.jar shellCopy the code
Remark:
- Hadoop 2.9.x+ is recommended because some object stores (Aliyun-OSS) support it from this release
- Flink-parquet and Flink-Avro have been tapped into the Hudi-Flink-bundle jar
- You can also copy the HUDi-flink-bundle jar directly to the $FLINK_HOME/lib directory
- This article uses the object store Aliyun-OSS. For convenience, you can also use the local path
The working directory structure for the demo is as follows:
/ Users/chenyuzhao/workspace/hudi - demo / - flink - 1.11.3 / hadoop - 2.9.2Copy the code
3. Batch mode read and write
Insert data
Create Hudi table with DDL statement:
Flink SQL> create table t2(
> uuid varchar(20),
> name varchar(10),
> age int,
> ts timestamp(3),
> `partition` varchar(20)
> )
> PARTITIONED BY (`partition`)
> with (
> 'connector' = 'hudi',
> 'path' = 'oss://vvr-daily/hudi/t2'
> );
[INFO] Table has been created.
Copy the code
DDL specifies the table path, the default record key is uuid, and the default pre-combine key is TS.
Then insert data into the table with VALUES statement:
Flink SQL> insert into t2 values
> ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
> ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
> ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
> ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
> ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
> ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
> ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
> ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: 59f2e528d14061f23c552a7ebf9a76bd
Copy the code
Flink’s job has been successfully submitted to the cluster. You can open the Web UI locally to observe the execution of the job:
Query data
After the job completes, query the table results with the SELECT statement:
Flink SQL> set execution.result-mode=tableau; [INFO] Session property has been set. Flink SQL> select * from t2; +-----+----------------------+----------------------+-------------+-------------------------+----------------------+ | +/- | uuid | name | age | ts | partition | +-----+----------------------+----------------------+-------------+-------------------------+----------------------+ | + | id3 | Julian | 53 | 1970-01-01T00:00:03 | par2 | | + | id4 | Fabian | 31 | 1970-01-01T00:00:04 | par2 | | + | id7 | Bob | 44 | 1970-01-01T00:00:07 | par4 | | + | id8 | Han | 56 | 1970-01-01T00:00:08 | par4 | | + | id1 | Danny | 23 | 1970-01-01T00:00:01 | par1 | | + | id2 | Stephen | 33 | 1970-01-01T00:00:02 | par1 | | + | id5 | Sophia | 18 | 1970-01-01T00:00:05 | par3 | | + | id6 | Emma | 20 | 1970-01-01T00:00:06 | par3 | +-----+----------------------+----------------------+-------------+-------------------------+----------------------+ Received a total of 8 rowsCopy the code
Set execution. Result-mode =tableau; Query results can be output directly to the terminal.
Clipping a partition by adding a partition path to the WHERE clause:
Flink SQL> select * from t2 where `partition` = 'par1'; +-----+----------------------+----------------------+-------------+-------------------------+----------------------+ | +/- | uuid | name | age | ts | partition | +-----+----------------------+----------------------+-------------+-------------------------+----------------------+ | + | id1 | Danny | 23 | 1970-01-01T00:00:01 | par1 | | + | id2 | Stephen | 33 | 1970-01-01T00:00:02 | par1 | +-----+----------------------+----------------------+-------------+-------------------------+----------------------+ Received a total of 2 rowsCopy the code
Update the data
The same record key data will be automatically overwritten, through INSERT the same key data can achieve data update:
Flink SQL> insert into t2 values > ('id1','Danny',24,TIMESTAMP '1970-01-01 00:00:01','par1'), > ('id2','Stephen',34,TIMESTAMP '1970-01-01 00:00:02','par1'); [INFO] Submitting SQL update statement to the cluster... [INFO] Table update statement has been successfully submitted to the cluster: Job ID: 944de5a1ecbb7eeb4d1e9e748174fe4c Flink SQL> select * from t2; +-----+----------------------+----------------------+-------------+-------------------------+----------------------+ | +/- | uuid | name | age | ts | partition | +-----+----------------------+----------------------+-------------+-------------------------+----------------------+ | + | id1 | Danny | 24 | 1970-01-01T00:00:01 | par1 | | + | id2 | Stephen | 34 | 1970-01-01T00:00:02 | par1 | | + | id3 | Julian | 53 | 1970-01-01T00:00:03 | par2 | | + | id4 | Fabian | 31 | 1970-01-01T00:00:04 | par2 | | + | id5 | Sophia | 18 | 1970-01-01T00:00:05 | par3 | | + | id6 | Emma | 20 | 1970-01-01T00:00:06 | par3 | | + | id7 | Bob | 44 | 1970-01-01T00:00:07 | par4 | | + | id8 | Han | 56 | 1970-01-01T00:00:08 | par4 | +-----+----------------------+----------------------+-------------+-------------------------+----------------------+ Received a total of 8 rowsCopy the code
You can see that the age values of the data whose UUID is ID1 and ID2 are updated.
Insert new data observations again:
Flink SQL> insert into t2 values > ('id4','Fabian',32,TIMESTAMP '1970-01-01 00:00:04','par2'), > ('id5','Sophia',19,TIMESTAMP '1970-01-01 00:00:05','par3'); [INFO] Submitting SQL update statement to the cluster... [INFO] Table update statement has been successfully submitted to the cluster: Job ID: fdeb7fd9f08808e66d77220f43075720 Flink SQL> select * from t2; +-----+----------------------+----------------------+-------------+-------------------------+----------------------+ | +/- | uuid | name | age | ts | partition | +-----+----------------------+----------------------+-------------+-------------------------+----------------------+ | + | id5 | Sophia | 19 | 1970-01-01T00:00:05 | par3 | | + | id6 | Emma | 20 | 1970-01-01T00:00:06 | par3 | | + | id3 | Julian | 53 | 1970-01-01T00:00:03 | par2 | | + | id4 | Fabian | 32 | 1970-01-01T00:00:04 | par2 | | + | id1 | Danny | 24 | 1970-01-01T00:00:01 | par1 | | + | id2 | Stephen | 34 | 1970-01-01T00:00:02 | par1 | | + | id7 | Bob | 44 | 1970-01-01T00:00:07 | par4 | | + | id8 | Han | 56 | 1970-01-01T00:00:08 | par4 | +-----+----------------------+----------------------+-------------+-------------------------+----------------------+ Received a total of 8 rowsCopy the code
Fourth, Streaming to read
Create a new table and inject data with the following statement:
Flink SQL> create table t1(
> uuid varchar(20),
> name varchar(10),
> age int,
> ts timestamp(3),
> `partition` varchar(20)
> )
> PARTITIONED BY (`partition`)
> with (
> 'connector' = 'hudi',
> 'path' = 'oss://vvr-daily/hudi/t1',
> 'table.type' = 'MERGE_ON_READ',
> 'read.streaming.enabled' = 'true',
> 'read.streaming.check-interval' = '4'
> );
[INFO] Table has been created.
Flink SQL> insert into t1 values
> ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
> ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
> ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
> ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
> ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
> ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
> ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
> ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: 9e1dcd37fd0f8ca77534c30c7d87be2c
Copy the code
Set table option read.streaming. Enabled to true, indicating that the table data can be read by streaming. Opiton read.streaming. Check-interval specifies that source monitors new commits at an interval of 4s. Option table.type Sets the table type to MERGE_ON_READ. Currently, only the MERGE_ON_READ table supports streaming reading.
The above operation takes place ina terminal called terminal_1.
Start Sql Client again from new terminal (we call terminal_2), recreate t1 table and query:
Flink SQL> set execution.result-mode=tableau; [INFO] Session property has been set. Flink SQL> create table t1( > uuid varchar(20), > name varchar(10), > age int, > ts timestamp(3), > `partition` varchar(20) > ) > PARTITIONED BY (`partition`) > with ( > 'connector' = 'hudi', > 'path' = 'oss://vvr-daily/hudi/t1', > 'table.type' = 'MERGE_ON_READ', > 'read.streaming.enabled' = 'true', > 'read.streaming.check-interval' = '4' > ); [INFO] Table has been created. Flink SQL> select * from t1; The 2021-03-22 18:36:37, 042 INFO org. Apache. Hadoop. Conf. Configuration. The deprecation [] - mapred. Job. The map. The memory. MB is deprecated. Instead, use mapreduce.map.memory.mb +-----+----------------------+----------------------+-------------+-------------------------+----------------------+ | +/- | uuid | name | age | ts | partition | +-----+----------------------+----------------------+-------------+-------------------------+----------------------+ | + | id2 | Stephen | 33 | 1970-01-01T00:00:02 | par1 | | + | id1 | Danny | 23 | 1970-01-01T00:00:01 | par1 | | + | id6 | Emma | 20 | 1970-01-01T00:00:06 | par3 | | + | id5 | Sophia | 18 | 1970-01-01T00:00:05 | par3 | | + | id8 | Han | 56 | 1970-01-01T00:00:08 | par4 | | + | id7 | Bob | 44 | 1970-01-01T00:00:07 | par4 | | + | id4 | Fabian | 31 | 1970-01-01T00:00:04 | par2 | | + | id3 | Julian | 53 | 1970-01-01T00:00:03 | par2 |Copy the code
Back to terminal_1, continue batch mode INSERT operation:
Flink SQL> insert into t1 values
> ('id1','Danny',27,TIMESTAMP '1970-01-01 00:00:01','par1');
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: 2dad24e067b38bc48c3a8f84e793e08b
Copy the code
After a few seconds, observe the terminal_2 output with an extra line:
+-----+----------------------+----------------------+-------------+-------------------------+----------------------+ | +/- | uuid | name | age | ts | partition | +-----+----------------------+----------------------+-------------+-------------------------+----------------------+ | + | id2 | Stephen | 33 | 1970-01-01T00:00:02 | par1 | | + | id1 | Danny | 23 | 1970-01-01T00:00:01 | par1 | | + | id6 | Emma | 20 | 1970-01-01T00:00:06 | par3 | | + | id5 | Sophia | 18 | 1970-01-01T00:00:05 | par3 | | + | id8 | Han | 56 | 1970-01-01T00:00:08 | par4 | | + | id7 | Bob | 44 | 1970-01-01T00:00:07 | par4 | | + | id4 | Fabian | 31 | 1970-01-01T00:00:04 | par2 | | + | id3 | Julian | 53 | 1970-01-01T00:00:03 | par2 | | + | id1 | Danny | 27 | 1970-01-01T00:00:01 | par1 |Copy the code
INSERT again in terminal_1:
Flink SQL> insert into t1 values
> ('id4','Fabian',32,TIMESTAMP '1970-01-01 00:00:04','par2'),
> ('id5','Sophia',19,TIMESTAMP '1970-01-01 00:00:05','par3');
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: ecafffda3d294a13b0a945feb9acc8a5
Copy the code
Observe the change of terminal_2 output:
+-----+----------------------+----------------------+-------------+-------------------------+----------------------+ | +/- | uuid | name | age | ts | partition | +-----+----------------------+----------------------+-------------+-------------------------+----------------------+ | + | id2 | Stephen | 33 | 1970-01-01T00:00:02 | par1 | | + | id1 | Danny | 23 | 1970-01-01T00:00:01 | par1 | | + | id6 | Emma | 20 | 1970-01-01T00:00:06 | par3 | | + | id5 | Sophia | 18 | 1970-01-01T00:00:05 | par3 | | + | id8 | Han | 56 | 1970-01-01T00:00:08 | par4 | | + | id7 | Bob | 44 | 1970-01-01T00:00:07 | par4 | | + | id4 | Fabian | 31 | 1970-01-01T00:00:04 | par2 | | + | id3 | Julian | 53 | 1970-01-01T00:00:03 | par2 | | + | id1 | Danny | 27 | 1970-01-01T00:00:01 | par1 | | + | id5 | Sophia | 19 | 1970-01-01T00:00:05 | par3 | | + | id4 | Fabian | 32 | 1970-01-01T00:00:04 | par2 |Copy the code
Five, the summary
Through some simple demonstrations, we found that the integration of HUDI Flink has been relatively complete and the read and write paths have been covered. For detailed configuration, you can refer to Flink SQL Config Options[10].
Hudi community is actively promoting deep integration with Flink, including but not limited to:
- Flink Streaming Reader supports watermark to implement data lake/warehouse intermediate computing layer pipeline
- Based on Hudi’s materialized view, Flink implements minut-level incremental view and serves online near-real-time queries
Note: [1] www.alibabacloud.com/help/zh/pro… [2] aws.amazon.com/cn/emr/feat… [3] [4] www.uber.com/ www.slideshare.net/vinothchand… [5] eng.uber.com/uber-big-da… [6] issues.apache.org/jira/browse… [7] ci.apache.org/projects/fl… [8] flink.apache.org/downloads.h… [9] repo.maven.apache.org/maven2/org/… [10] hudi.apache.org/docs/config…