This article is shared by Zhang Jun, big data development engineer of Elong in the same city, and mainly introduces the production practice of integrating Iiceberg with Flink of Elong in the same city. The contents include:

  1. Background and pain points
  2. Landing of Flink + Iceberg
  3. Iceberg Optimization Practice
  4. The follow-up work
  5. Revenue and Summary

I. Background and pain points

Business background

With Cheng Yilong is a provide air tickets, accommodation, transportation and other services platform for the online travel service, at the moment, I department belongs to the research and development department of the company, main responsibility is for other business departments in the company to provide some basic services, our big data system is mainly to undertake business is in the department of some of the big data related data statistics and analysis, etc. Data sources include gateway log data, server monitoring data, K8s container log data, App log, MySQL binlog and so on. Our main big data task is to build real-time reports based on the above logs, provide presto-based report display and real-time query services, and also develop some real-time and batch tasks based on Flink to provide accurate and timely data support for business parties.

Original architecture scheme

Since all of our raw data is stored in Kafka, the original technical architecture was that the Flink task would consume Kafka’s data and write it to Hive in real time after various processing by Flink SQL or Flink JAR. Most of the tasks are Flink SQL tasks, because I think SQL development is much simpler than code, and easy to maintain, easy to understand, so use SQL to write as far as possible in SQL. The platform for submitting Flink is Zeppelin. The task of submitting Flink SQL is a built-in function of Zeppelin, and the task of submitting JAR package is a Zeppelin plug-in developed by myself based on Application mode. For data that is delivered to Hive, Metabase (using Presto), an open source report system, provides real-time report display, periodic email report sending, and customized SQL query services. Since businesses have high requirements on real-time data and hope that data can be displayed as soon as possible, the checkpoint of many Flink streaming tasks is set to 1 minute and the data format is ORC.

Pain points

Because ORC is used as column storage format, it cannot append operations like row storage format, so it inevitably produces a very common and difficult problem in the field of big data, that is, HDFS small file problem.

Small files at the start of our solution is to write a small file compression tools, regularly to merge, the Hive partitions are generally days level, so the principle of this tool is to start in the morning every day a regular task to compress the data yesterday, the first to write the data of yesterday to a temporary folder, after compression, After the number of records is consistent with the original data, the original data is overwritten with the compressed data. However, because transactions cannot be guaranteed, many problems occur:

  • During compression, data was written to the Hive partition yesterday due to delayed data arrival, and the check will fail, resulting in a failure to merge small files.
  • The operation of replacing old data is not guaranteed by transaction. If new data is written to the old partition during the operation, the new data will be overwritten, causing data loss.
  • Without transaction support, data in the current partition cannot be merged in real time, but can only be merged and compressed in the previous partition. Data in the latest partition still has a problem of small files, resulting in the latest data query performance cannot be improved.

Second, the landing of Flink+Iceberg

Iceberg Technology Research

Therefore, based on the above small HDFS files, slow query and other problems, combined with our current situation, I investigated the current data lake technology on the market: Delta, Apache Iceberg and Apache Hudi have considered the functions supported by the current data lake framework and the future community planning. Finally, we choose Iceberg for the following reasons:

■ Iceberg deeply integrates Flink

As mentioned above, most of our tasks are Flink tasks, including batch processing tasks and stream processing tasks. At present, Iceberg is the most perfect of the three data lake frameworks integrated with Flink. If Iceberg is used to replace Hive, the cost of migration is very small and users are almost unaware. For example, our original SQL looks like this:

INSERT INTO hive_catalog.db.hive_table SELECT * FROM kafka_table
Copy the code

After migrating to Iceberg, you only need to modify the catalog.

INSERT INTO iceberg_catalog.db.iIcebergceberg_table SELECT * FROM kafka_table
Copy the code

Presto queries are similar in that you just need to modify the catalog.

■ Iceberg’s design architecture makes queries faster

In the design architecture of Iceberg, the manifest file stores partiation-related information and related statistics (Max /min) of data files, etc., to query the data of some large partitions, you can directly locate the desired data. Instead of listing the entire HDFS folder like Hive, the time complexity is reduced from O(n) to O(1), which significantly improves the speed of some large queries. In Iceberg PMC Chair Ryan Blue’s speech, We saw the execution time of the filter hit task drop from 61.5 hours to 22 minutes.

■ Write CDC data to Iceberg using Flink SQL

Flink CDC provides a way to read MySQL binlog directly, compared with the previous need to use Canal to read binlog to write Iceberg data, and then consume Iceberg data. The maintenance of two components is reduced, the link is reduced, and the maintenance cost and error probability are saved. In addition, it can achieve perfect docking between the import of full data and incremental data, so it will be a very meaningful thing to use Flink SQL to import MySQL binlog data to Iceberg to do MySQL->Iceberg import.

In addition, for our initial demand of compressing small files, Although Iceberg cannot achieve automatic compression at present, it provides a batch task, which can already meet our demand.

■ Migrate the Iceberg table from the Hive table

Migration Preparations

At present, all our data is stored in Hive table. After verifying Iceberg, we decided to migrate Hive data to Iceberg, so I wrote a tool that can use Hive data, and then create a new Iceberg table and set up corresponding metadata for it. However, when testing, it is found that if Iceberg and Hive use the same data file, the compressing program will constantly compress the Iceberg table’s small files. After compressing, the old data will not be deleted immediately. Therefore, the Hive table will find double data, so we adopt the double write strategy. The program that originally writes to Hive is still, but a new program is started to write to Iceberg, so that it can observe the Iceberg table for a period of time. Can also be compared with the original Hive data, to verify the correctness of the program.

After a period of observation, nearly billions of data every day, after the compression of several T size of Hive table and Iceberg table, a data is not bad. So after the final comparison data is no problem, stop writing to the Hive table and use the new Iceberg table.

The migration tool

I made a New Iceberg Action based on Flink Batch job for the Hive table, which is submitted to the community, but not merged yet: github.com/apache/iceb… The original Hive data does not move, and then create a new Iceberg table, and then generate the corresponding metadata for this new Iceberg table, if you need to take a look first.

In addition, Iceberg community, there is a tool to migrate existing DATA to existing Iceberg table, similar to Hive LOAD DATA INPATH… INTO TABLE is created using Spark’s stored procedure. You can also check it out: github.com/apache/iceb…

Third, Iceberg optimization practice

Compressed small file

At present, an extra batch task is used to compress small files. Iceberg provides a Spark version of action. I found some problems when I was doing the function test. Therefore, I implemented a Flink version referring to Spark version, fixed some bugs and optimized some functions.

Because our Iceberg metadata is stored in Hive, that is, we use HiveCatalog, so the logic of the compression program is to find all Iceberg tables in Hive and compress them in sequence. There are no filtering conditions for compression, and compression of all tables, whether partitioned or non-partitioned, is done to process some Flink tasks that use EventTime. If there is delayed data, data is written to the previous partition. If full table compression does not compress only the current partition, data written to other days will not be compressed.

The reason why scheduled task compression is not enabled is that, for example, a table is compressed for five minutes. If the compression task is not completed within five minutes and a new snapshot is not submitted, the next scheduled task is started again, the data in the previous unfinished compression task will be compressed again. So the per-table compression strategy ensures that only one task per table is being compressed at any one time.

Code examples reference:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Actions.forTable(env, table) .rewriteDataFiles() //.maxParallelism(parallelism) //.filter(Expressions.equal("day", day)) //.targetSizeInBytes(targetSizeInBytes) .execute();Copy the code

At present, the system runs stably and has completed compression of tens of thousands of tasks.

Note: However, for the newly released Iceberg 0.11, there is a known bug, that is, when the file size before compression is larger than the size to be compressed (targetSizeInBytes), data will be lost. In fact, I found this problem when I tested small file compression at the very beginning. My policy is not to compress data files larger than the target file, but this PR was not incorporated into 0.11. Later another community brother found the same problem and submitted a PR (github.com/apache/iceb…). , the strategy is to split this large file to the target file size, which has been merged into master and will be released in the next bug fix version 0.11.1.

Query optimization

■ Batch scheduled tasks

Currently, Flink’s SQL client is not quite as sophisticated as Hive for batch tasks that are scheduled, such as executing Hive -f to execute a file. And different tasks require different resources, parallelism and so on.

So I packaged a Flink program myself, and called this program to process, read the SQL in a specified file, to submit the batch task. Control the resources and parallelism of tasks on the command line.

/home/flink/bin/fFlinklinklink run -p 10 -m yarn-cluster /home/work/iceberg-scheduler.jar my.sql
Copy the code

S optimization

In the query part of batch task, I have made some optimization work, such as limit push down, filter push down, query parallelism inference, etc., which can greatly improve the speed of query. These optimization has been pushed back to the community and released in Iceberg 0.11 version.

Operations management

■ Clean orphan files

  1. Deleting a Scheduled Task

When using Iceberg, it sometimes happens that I submit a Flink task and, for some reason, stop it before Iceberg submits a snapshot. In addition, exceptions cause programs to fail, resulting in isolated data files that are not in Iceberg metadata and are not reachable or useful to Iceberg. So we need to clean up these files like JVM garbage collection.

Currently Iceberg provides a Spark version of action to deal with these useless files. We use the same strategy as composting small files to fetch all Iceberg tables in Hive. Perform a scheduled task every hour to delete these useless files.

SparkSession spark = ...... Actions.forTable(spark, table) .removeOrphanFiles() //.deleteWith(...) .execute();
Copy the code
  1. Hit the pit

Normal data files were deleted during the operation of the program. After investigation, as the snapshot retention is set to one hour, the cleaning time of this program is also set to one hour. Through the log, it is found that this program deleted normal data. Check the code, it should be set to the same time, while cleaning the isolated file, another program was reading the snapshot for expired, causing the normal data to be deleted. Finally, we changed the cleaning time of the program to the default three days, and no data files were deleted. Of course, to be on the safe side, we can override the original method of deleting files and replace it with a backup folder, check that there is no problem, and then delete it manually.

■ Snapshot expires

Our snapshot expiration policy is written together with the batch task of compressing small files. After compressing small files, snapshot expiration is processed for the table. The current retention time is one hour. The reason is that some large tables have many partitions and a short checkpoint. If snapshots are kept for a long time, too many small files will still be kept. We do not need to query historical snapshots at the moment, so I set the snapshot retention time to one hour.

long olderThanTimestamp = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1); table.expireSnapshots()// .retainLast(20).expireOlderThan(olderThanTimestamp).commit();Copy the code

■ Data Management

After data is written, when you want to check the number of data files in a snapshot, you cannot query Spark to know which data files are useful and which are not. Therefore, corresponding management tools are required. Flink is still a bit immature, but we can use Spark3 to check it out.

  1. DDL

Flink SQL Client is used to create the table. Other related DDL operations can use the Spark to do: iceberg.apache.org/spark/#ddl-…

  1. DML

Some data-related operations, such as deleting data, can be performed through MySQL, but Presto currently only supports partition level deletion.

  1. show partitions & show create table

When we operate Hive, there are some common operations, such as show partitions and show create table, which are not supported by Flink at present, so it is very inconvenient to operate Iceberg. We have made some modifications based on Flink 1.12, but they have not been completely submitted to the community yet, and will be submitted to Flink and Iceberg communities later.

Follow-up work

  • Flink SQL access CDC data to Iceberg

Currently in our internal version, I have tested the ability to use Flink SQL to write CDC data (such as MySQL binlog) to Iceberg. There is still some work to be done to implement this function in the community version, and I have submitted some related PR to advance this work.

  • Use SQL to delete and update

For copy-on-write tables, Spark SQL can be used for row-level deletion and update. For the syntax supported, see the test class in the source code:

Org. Apache. Iceberg. Spark. Extensions. TestDelete & org. Apache. Iceberg. Spark. Extensions. TestUpdate, these functions in my test environment test is ok, But it hasn’t been updated to production yet.

  • Streaming Read using Flink SQL

There are some such scenarios in work. Due to large data, Iceberg data can only be stored for a short period of time. If it is unfortunate that the program is written incorrectly, it is impossible to consume from an earlier time. These problems can be solved when Iceberg’s Streaming Read is introduced, because Iceberg stores all the data, provided that the data is not required to be particularly precise, such as at the second level. Because at present the transaction submission of Flink writer Iceberg is based on Flink Checkpoint interval.

V. Earnings and summary

After about a quarter of research, testing, optimization and bug repair on Iceberg, we have migrated all the existing Hive tables to Iceberg, which has perfectly solved all the original pain points. At present, the system runs stably and gains a lot of benefits compared to Hive:

  • Flink write resources reduced

For example, by default, a flink task that reads Kafka and writes to Hive requires 60 degrees of parallelism so kafka doesn’t backlog. Instead of writing iceberg, you only need 20 degrees of parallelism.

  • The query speed becomes faster

Previously, when talking about Iceberg query, it does not go to list the whole folder to obtain partition data like Hive, but obtains relevant data from manifest file first. The query performance is significantly improved, and the query speed of some large reports is increased from 50 seconds to 30 seconds.

  • Concurrent read and write

Due to Iceberg’s transaction support, we can realize concurrent reading and writing of a table, and Flink streams data into the lake in real time. The compression program compreses small files at the same time, and the program that removes expired files and snapshots at the same time removes useless files. In this way, data can be provided in a more timely manner with a minute-level delay. Querying the latest partitioned data is much faster, and data accuracy is guaranteed due to Iceberg’s ACID nature.

  • time travel

You can retrieve data from a previous point in time retrospectively.

To sum up, we can use Flink SQL to read and write Iceberg in batches and streams, and can compress small files in real time. We can use Spark SQL to do delete, update and DDL operations. You can then use Flink SQL to write the CDC data to Iceberg. I have contributed all the optimizations and bug fixes of Iceberg to the community. Because the author’s level is limited, sometimes there are mistakes, but also please give advice.

Author introduction: Zhang Jun, Big data development engineer of Tongcheng Yilong