In practical work, the author has investigated some advantages and disadvantages of Iceberg and its application in major factories, which is summarized below. I hope I can give you some enlightenment.
As big data storage and processing requirements become more and more diverse, how to build a unified data lake storage and carry out various forms of data analysis on it has become an important direction for enterprises to build big data ecology. How to build Data Pipeline on Data lake storage quickly, consistently and atomically has become an urgent problem to be solved.
To that end, Uber open-source Apache Hudi, Databricks with Delta Lake, and Netflix with Apache Iceberg, All of a sudden, this form middleware with ACID capability has become a hot direction in the field of big data and data lake.
We have previously introduced the concept and application of data lakes:
Concepts and differences about data warehouse, data lake, data platform and data center
Enterprise data lake construction and analysis solutions
Why Iceberg?
Talking about the reason for introducing Iceberg, some pain points Iceberg meets in the process of building big data ecology can be solved exactly:
-
T+0 data landing and processing. The traditional data processing process usually needs a long link from data entry to data processing, involving a lot of complicated logic to ensure the consistency of data, because of the complexity of architecture, the whole pipeline has obvious delay. Iceberg’s ACID capability simplifies the design of the entire pipeline and reduces the latency of the entire pipeline.
-
Reduce the cost of data correction. Traditional Hive/Spark data modification requires reading data and writing data after modification, which incurs high modification costs. Iceberg’s ability to modify and delete can effectively reduce overhead and improve efficiency.
The technical reasons for choosing Iceberg over the other two open source projects are as follows:
-
The architecture and implementation of Iceberg are not bound to a specific engine. Instead, it realizes a common data organization format, which can be easily connected with different engines (such as Flink, Hive and Spark). This is very important for Tencent’s internal landing, because the connection of upstream and downstream data pipes often involves different computing engines.
-
Good architecture and open format. Compared with Hudi and Delta Lake, Iceberg’s architecture is more elegant and has a complete definition and evolvable design for data format and type system.
-
Optimization of object-oriented storage. Iceberg fully considers the characteristics of object storage in the way of data organization and avoids time-consuming listing and rename operations, which makes it more advantageous in the adaptation of data lake architecture based on object storage.
Beyond technical considerations, the code quality, community, and other aspects of the detailed evaluation are as follows:
-
Overall code quality and future evolution. The abstractions and advantages of the overall architecture code, and the ability of these advantages to evolve into the future, are of great concern to the team. A technology needs to be able to evolve architecturally without requiring a lot of incompatible refactoring to support it.
-
The potential of the community and the value Tencent can bring to the community. The activity of the community is another consideration, and more importantly, what Tencent can do and what value it can bring to the community. If the community is relatively closed or mature enough, Tencent will not be able to add as much value, which is also an important consideration for the team when choosing a technology.
-
Neutrality and openness of technology. The community can drive the evolution of technology with an open attitude rather than a reserved contribution to the community, while the parties to the community are relatively neutral and there is no relatively strong party to fully control the evolution of the community.
Tencent’s optimization and improvement of Iceberg
Since its official development, Tencent has made some optimization and improvement on Iceberg based on the open source version, mainly including:
-
The row-level deletion and update operation is implemented, which greatly saves the cost of data correction and deletion.
-
The Spark 3.0 DataSource V2 is adapted, and the SQL and DataFrame of Spark 3.0 can seamlessly connect to Iceberg.
-
Added support for Flink, which can connect Flink to Iceberg format for data landing.
These improvements improve Iceberg’s usability on the ground and provide more attractive features for its landing within Tencent. At the same time, Tencent is actively embracing the community. Most of the internal improvements have been pushed to the community, and some of the internal customization needs will be contributed back to the community in a more general way.
At present, the team is actively trying to integrate Iceberg into Tencent’s big data ecosystem. The main challenge lies in how to adapt Iceberg to Tencent’s existing system and self-developed system, and how to find a place to settle in a mature big data system and bring obvious benefits.
-
Iceberg’s upstream and downstream supporting capacity construction is relatively lacking, requiring more supporting capacity construction, such as the adaptation of Spark, Hive, Flink and other different engines.
-
Secondly, the verification of Iceberg’s core capability maturity, whether it can support Tencent’s big data scale test, whether its claimed capability is really available at the enterprise level, need to be further verified and strengthened.
-
Finally, after years of development, Tencent’s internal big data has formed a complete set of data access analysis schemes. How Iceberg can be implemented internally and optimize the existing schemes is very important.
A typical practice
Flink integrates the practice of Iceberg in Tongcheng Yilong
Pain points
Because ORC is used as column storage format, it cannot append operations like row storage format, so it inevitably produces a very common and difficult problem in the field of big data, that is, HDFS small file problem.
Flink + Iceberg fall to the ground
Iceberg Technology Research
Based on HDFS small files, slow query and other problems, combined with our current situation, I investigated the current data lake technology on the market: Delta, Apache Iceberg and Apache Hudi have considered the functions supported by the current data lake framework and the future community planning. Finally, we choose Iceberg for the following reasons:
Iceberg deeply integrates Flink
Most of our tasks are Flink tasks, including batch processing tasks and stream processing tasks. Currently, Among the three data lake frameworks, Iceberg is the most perfect one to integrate Flink. If Iceberg is used to replace Hive, the cost of migration is very small and users are almost unaware.
For example, our original SQL looks like this:
INSERT INTO hive_catalog.db.hive_table SELECT * FROM kafka_table
Copy the code
After migrating to Iceberg, you only need to modify the catalog.
INSERT INTO iceberg_catalog.db.iIcebergceberg_table SELECT * FROM kafka_table
Copy the code
Presto queries are similar in that you just need to modify the catalog.
Iceberg’s design architecture makes queries faster
In the design architecture of Iceberg, the manifest file stores partiation-related information and related statistics (Max /min) of data files, etc., to query the data of some large partitions, you can directly locate the desired data. Instead of listing the entire HDFS folder like Hive, the time complexity is reduced from O(n) to O(1), which significantly improves the speed of some large queries. In Iceberg PMC Chair Ryan Blue’s speech, We saw the execution time of the filter hit task drop from 61.5 hours to 22 minutes.
Write CDC data to Iceberg using Flink SQL: Flink CDC provides a way to read MySQL binlog directly, compared with the previous need to use Canal to read binlog to write Iceberg data, and then consume Iceberg data. The maintenance of two components is reduced, the link is reduced, and the maintenance cost and error probability are saved. In addition, it can achieve perfect docking between the import of full data and incremental data, so it will be a very meaningful thing to use Flink SQL to import MySQL binlog data to Iceberg to do MySQL->Iceberg import.
In addition, for our initial demand of compressing small files, Although Iceberg cannot achieve automatic compression at present, it provides a batch task, which can already meet our demand.
Iceberg Optimization Practice
Compressed small file
Code examples reference:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Actions.forTable(env, table)
.rewriteDataFiles()
//.maxParallelism(parallelism)
//.filter(Expressions.equal("day", day))
//.targetSizeInBytes(targetSizeInBytes)
.execute();
Copy the code
At present, the system runs stably and has completed compression of tens of thousands of tasks.
Query optimization
Batch processing scheduled tasks
/home/flink/bin/fFlinklinklink run -p 10 -m yarn-cluster /home/work/iceberg-scheduler.jar my.sql
Copy the code
In the query part of batch task, I have made some optimization work, such as limit push down, filter push down, query parallelism inference, etc., which can greatly improve the speed of query. These optimization has been pushed back to the community and released in Iceberg 0.11 version.
Operations management
-
Clean the orphan file
-
Snapshot Expiration Processing
-
Data management
Construction practice of Flink + Iceberg full-scene real-time data warehouse
1. Near-real-time data access
Iceberg not only supports read and write separation, but also supports concurrent read, incremental read, small file merger and second to minute delay. Based on these advantages, we try to use Iceberg functions to build a real-time data warehouse architecture based on Flink.
As shown in the figure below, every COMMIT operation of Iceberg changes the visibility of data, such as changing the data from invisible to visible. In this process, near-real-time data recording can be realized.
2. Real-time data storehouse-data lake analysis system
For example, Spark’s offline scheduling task is used to run data, pull data, extract data, and then write data to the Hive table. This process takes a long time. With Iceberg table structure, Flink or Spark Streaming can be used in the middle to complete near-real-time data access.
Since Iceberg can be used as an excellent table, supporting both Streaming reader and Streaming sink, is it possible to consider replacing Kafka with Iceberg?
Iceberg’s underlying storage is cheap storage like HDFS or S3, and Iceberg supports column storage like Parquet, ORC, and Avro. With support for column storage, basic optimization of OLAP analysis can be performed directly in the middle tier. For example, the most basic OLAP optimization strategy of predicate push-down and the Streaming Reader function based on Iceberg Snapshot can greatly reduce the delay from day level to hour level of offline tasks and transform it into a near-real-time data lake analysis system.
In the middle processing layer, presto can be used for some simple queries. Because Iceberg supports Streaming Read, Flink can also be directly connected to the middle layer of the system to perform some tasks of batch processing or Streaming computing. The intermediate results are further calculated and output to the downstream.
The advantages of Iceberg to replace Kafka mainly include:
-
Realizes the storage layer stream batch unification
-
The middle tier supports OLAP analysis
-
Perfect support for efficient backtracking
-
Storage cost reduction
At the bottom of Iceberg, a cache like Alluxio is supported, and the data lake can be accelerated by the ability of cache.
Best practices
-
Real-time small file merge
-
Flink real-time incremental reading
-
SQL Extension management files
Where is Flink + Iceberg going to practice real-time counting warehouse
1. Small file processing
Before Iceberg 0.11, the batch API was triggered periodically to merge small files, which could merge, but needed to maintain a set of Actions code, and it was not real-time merge.
Table table = findTable(options, conf); Actions.forTable(table).rewriteDataFiles() .targetSizeInBytes(10 * 1024) // 10KB .execute();
New feature is Iceberg 0.11, which supports streaming small file merging.
The partition/bucket key writes data in hash and merges files directly from the source. The advantage is that a task processes data in a partition and submits its own Datafile file. For example, a task only processes data in the corresponding partition. This avoids the problem of multiple tasks submitting many small files and requires no additional maintenance code. You only need to specify the attribute write-.distribution-mode when building a table. This parameter is common with other engines, such as Spark.
CREATE TABLE city_table ( province BIGINT, city STRING ) PARTITIONED BY (province, city) WITH ( 'write.distribution-mode'='hash' );
Copy the code
2. Iceberg 0.11
2.1 Introduction to Sorting
Before Iceberg 0.11, Flink does not support Iceberg sorting function, so it can only support the sorting function with Spark in batch mode. 0.11 added support for sorting feature, which means that we can also experience this benefit in real time.
The nature of sorting is to make scanning faster, because max-min can filter out a large amount of invalid data because all data is sorted from smallest to largest after aggregating according to sort Key.
2.2 sorting demo
insert into Iceberg_table select days from Kafka_tbl order by days, province_id;
Copy the code
3. Iceberg is Iceberg
Parameter interpretation
-
File_path: indicates the location of a physical file.
-
Partition: indicates the partition corresponding to the file.
-
Lower_bounds: The minimum value of multiple sorted fields in this file. Here are my minimum values for days and province_id.
-
Upper_bounds: The maximum number of sorted fields in this file. Here is my maximum number of days and province_id.
Determine whether to read file_path files based on the upper and lower limits of partitions and columns. After data sorting, file column information is also recorded in metadata. Query plans are used to locate files from the manifest, without storing information in Hive metadata. This reduces Hive metadata pressure and improves query efficiency.
Using the sorting feature of Iceberg 0.11, the day is used as the partition. The manifest file records the sorting rule, which improves the search efficiency when retrieving data. It not only achieves the search advantages of Hive partitions, but also avoids the pressure caused by excessive Hive metadata metadata.
Construct enterprise-level real-time data lake based on Flink+Iceberg
At present, the Apache Iceberg 0.10.0 version realizes the Flink stream batch into the lake, and also supports the Flink batch job to query the data of Iceberg data lake.
As we know, the design principle of Flink Iceberg sink is that iceberg iceberg adopts optimistic lock to achieve the submission of Transaction, that is to say, when two people submit the change Transaction to Iceberg at the same time, the last party will try again and again. Wait until the first party successfully commits and then re-read the metadata information to commit the transaction. With this in mind, it is not appropriate to commit a transaction with multiple concurrent operators, which can lead to a large number of transaction conflicts and retries.
Therefore, the Flink writing process is divided into two operators, one is called IcebergStreamWriter, which is mainly used to write records to the corresponding Avro, Parquet and ORC files, generate a corresponding Iceberg DataFile, and send it to the downstream operator. The other IcebergFilesCommitter is used to collect all DataFile files and submit transactions to Apache Iceberg when a checkpoint arrives. Data is written to the checkpoint.
After understanding the design of Flink Sink operator, the next important question is: how to correctly design the state of the two operators?
First of all, IcebergStreamWriter has a relatively simple design. Its main task is to convert records to DataFile, and there is no complex State to design. The IcebergFilester committer maintains a list of DataFile files for each checkpointId, map>, so that even if a checkpoint transaction fails, Its DataFile files are still maintained in State, and data can still be submitted to the Iceberg table via subsequent checkpoint.
Iceberg0.11 is integrated with Spark3.0
1. Install and compile Iceberg0.11
Here I downloaded iceberg version 0.11.1 and need to install Gradle in advance. The compilation of Iceberg is used here, and Gradle5.4.1 is used here
Wget https://downloads.gradle.org/distributions/gradle-5.4-bin.zip unzip - d/opt/gradle gradle - 5.4 - bin. Zip vim #GREDLE export GRADLE_HOME=/opt/gradle/gradle-5.4 export PATH=$PATH:$GRADLE_HOME/bin source /etc/profileCopy the code
Compile Iceberg, download the source code on Github to compile, skip the download process here and compile directly
CD iceberg-apache-iceberg-0.11.1 Gradle build-x testCopy the code
2. Iceberg compilation is combined with SparkSQL
2.1 After the above compilation is successful, go to the Spark3 directory and take out the JAR package we need
cd spark3-runtime/build/libs
ll
Copy the code
Iceberg – Spark3-Runtime-0.11.1. jar is the plug-in package we need
2.2 Placing the Plug-in package in the Spark directory
CD $SPARK_HOME/jars cp iceberg - apache - iceberg - 0.11.1 / spark3 - the runtime/build/libs/iceberg - spark3 - runtime - 0.11.1. Jar.Copy the code
2.3 Modifying Spark Configurations
Pattern 1: Use Hadoop as metadata
in spark-defaults.confspark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalogspark.sql.catalog.iceberg.type=hadoopm apreduce.output.fileoutputformat.outputdir=/tmpspark.sql.catalog.iceberg.warehouse=hdfs://mycluster/iceberg/warehouse
Mode 2: Metadata sharing hive.metastore
In the spark – defaults. Confspark. SQL. Catalog. Hive_iceberg = org.apache.iceberg.spark.SparkCatalogspark.sql.catalog.hive_iceberg.type = hivespark.sql.catalog.hive_iceberg.uri = thrift://node182:9083
The following examples are based on Mode 2: Metadata Sharing hive.metastore.
XML,core-site. XML,mared-reduce. XML,hive-site. XML, and yarn-site. XML are required in the conf directory. 2. Add the following statement to spark-defaults.conf
spark.sql.catalog.iceberg = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.iceberg.type = hive
spark.sql.catalog.iceberg.uri = thrift://node182:9083
Copy the code
The spark. SQL. Catalog. Iceberg. Uri is referring to the hive – site. In the XML configuration spark. SQL. Catalog. The iceberg, the meaning of the iceberg for namespace namespace, We will now create the database under this namespace.
< property > < name > hive. Metastore. Uris, < name > < value > thrift: / / 172.16.129.182:9083 < value > / < description > thrift URI for remote metastore. Used by metastore client to connect to remote metastore.</description> </property>Copy the code
2.4 to start the Spark
1) Start the ThriftServer service of Spark
sh start-thriftserver.sh --master yarn
Copy the code
2) Connect with Beeline
bin/beeline ! connect jdbc:hive2://node182:25001 spark sparkCopy the code
The port number of the connection is read from the hive-site. XML configuration
<property>
<name>hive.server2.thrift.port</name>
<value>25001</value>
<description>Port number of HiveServer2 Thrift interface when hive.server2.transport.mode is 'binary'.</description>
</property>
Copy the code
2.5 Creating the Iceberg Source Table
1) Create database
create database iceberg.jzhou_test;
Copy the code
To see the current namespace, use the following command
show current namespace;
Copy the code
2) Create the iceberg source table
use iceberg.jzhou_test;
create table iceberg_spark(id int, name string) using iceberg;
Copy the code
You can change the underlying file_format, which is parquet by default, but I want to change it to ORc in two ways:
Method one:
ALTER TABLE iceberg_spark SET TBLPROPERTIES('write.format.default' = 'orc');
Copy the code
Method 2:
create table iceberg_spark(id int, name string) using iceberg TBLPROPERTIES ('write.format.default' = 'orc');
Copy the code
3) Insert data and view the metadata in the HDFS table
The HDFS directory where metadata resides can be obtained from the hive-site. XML configuration:
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive/warehouse</value>
<description>location of default database for the warehouse</description>
</property>
Copy the code
See HDFS data and metadata
conclusion
At present, IceBerg is in rapid iteration, and more and more large companies join in the contribution of IceBerg, including Netflix, Apple, Adobe, Expedia and other foreign big companies, as well as Tencent, Ali, netease and other domestic companies. A good technical architecture will eventually be recognized by more people. With the increase of domestic promotion, as well as the investment and operation of domestic developers in this project, the future of Iceberg in China is promising.
The hard just Presto | Presto principle & tuning & interview & practical comprehensive upgrade edition
The hard just Apache Iceberg | technology research and application in the practice of companies big summary”
The hard just ClickHouse | 40000 words long ClickHouse & based on practice & tuning all perspective”
The Boy hard just | SQL data warehouse of the Gospel of the data warehouse system modeling & implementation & notice little summary”
The hard just Hive | small tuning interview, a summary of 40000 words
The hard just user picture (a) | tag system user portrait under construction of small guide
The hard just user portrait (2) | portrait of users based on the large data to build a small encyclopedia