This paper is compiled from The Data Lake Project of Iceberg and Object Storage shared by Sun Wei, senior software DEVELOPMENT manager of Dell Technology Group, on Flink Meetup in Shanghai on April 17th. The content of the paper is as follows:
- Introduction to Data Lakes and Iceberg
- The object store supports the Iceberg data lake
- Demonstration project
- Some thoughts on storage optimization
Data Lake and Iceberg
1. Data Lake ecology
As the graph above shows, for a mature data lake ecosystem:
-
First of all, we think it should have massive storage capability, such as object storage, public cloud storage and HDFS.
-
On top of this, it also needs to support rich data types, including unstructured images and videos, semi-structured CSV, XML, Log, and structured database tables;
-
In addition, efficient and unified metadata management is required so that computing engines can easily index various types of data for analysis.
-
Finally, we need to support a variety of computing engines, such as Flink, Spark, Hive, and Presto, to connect to existing enterprise application architectures.
2. Application scenarios of structured data on a data lake
Above is a typical application scenario on a data lake.
There may be a variety of data on the data source, different data sources and different formats. Things like data, logs, buried information, IOT, etc. This data flows through a number of streams and then into the computing platform. At this point it needs a structured solution to organize the data onto a storage platform and then make it available to back-end data applications for real-time or scheduled queries.
What characteristics does such a database solution require?
- First, you can see that there are many types of data sources, so you need to support a relatively rich data Schema organization;
- Second, it supports real-time data queries during the injection process, so the ACID guarantee is needed to ensure that you don’t read dirty data in the middle state that hasn’t been written yet.
- Finally, things like logs may need to be formatted temporarily, or a column added. In such cases, it is necessary to avoid the traditional digital store, where all the data may have to be rewritten and re-injected into the storage. Instead, a lightweight solution is needed to meet the requirements.
The positioning of Iceberg database lies in the realization of such a function, which connects the computing platform on top and the storage platform on the bottom.
3. Typical solution of structured data in data lake
For structured data organization, the typical solution is to use the traditional database organization.
As shown in the figure above, there is a namespace above the database table isolation; There are multiple tables in the middle, which can provide the preservation of various data schemas. The table needs to provide ACID features as well as support the evolution of local schemas.
4. The organizational structure of Iceberg table data
-
Snapshot Metadata: table Schema, Partition, Partition spec, Manifest List path, and current snapshot.
-
**Manifest List: **Manifest File path and its Partition, data File statistics.
-
**Manifest File: **Data File path and its upper and lower boundaries for each column of Data.
-
**Data File: ** Actual table content Data, organized in Parque, ORC, Avro, etc.
Let’s take a closer look at how Iceberg organizes its data. As the picture above shows:
-
It can be seen that the right side starts from data files, data files store table content data, generally support Parquet, ORC, Avro and other formats;
-
Up is the Manifest File, it will record the path of the data File and the upper and lower boundaries of each column of data, convenient filtering query files;
-
Then it is the Manifest List, it is linked to the bottom of a plurality of Manifest files, and records the partition range information corresponding to the Manifest File, but also to facilitate the subsequent filtering query;
The Manifest List, which represents the snapshot information, contains all the data links to the current database tables, and is the key guarantee that Iceberg supports ACID.
After a snapshot is created, only data that can be referenced by the snapshot can be read. Data that is still being written will not be referenced by the snapshot, so dirty data will not be read. Multiple snapshots share the previous data files, and share the previous data by sharing these Manifest files.
-
Above that is snapshot metadata, which records current or historical table Scheme changes, partition configuration, Manifest File paths for all snapshots, and which snapshot is the current one.
At the same time, Iceberg provides the abstraction of namespace and table for complete data organization and management.
5. Iceberg write process
Above is the flowchart written to Iceberg data. Here, computing engine Flink is used as an example.
-
Firstly, Data Workers will read Data from metadata for parsing, and then send a record to Iceberg for storage.
-
Like a common database, Iceberg has predefined partitions, and those records are written to different partitions to form new files.
-
Flink has a CheckPoint mechanism. When a file arrives, Flink writes the file and generates a list of the files to submit to the Commit Worker.
-
The Commit Worker reads the information about the current snapshot and merges it with the generated file List to generate a new Manifest List and the information about the subsequent metadata table files. After the Commit is completed, a new snapshot is formed.
6. Iceberg query process
The above is the Iceberg data query process.
-
First, Flink Table Scan worker performs a scan. When scan, it can start from the root like a tree, find the current snapshot or a historical snapshot specified by the user, and then take out the Manifest List file of the current snapshot from the snapshot. According to some information saved at that time, you can filter out the Manifest File that meets the query conditions;
-
Then go through the Manifest File record information, filter out the Data Files required below. After this file is taken out, it is handed to the Recorder Reader workers, which reads the Recode that meets the condition from the file, and then returns to the upper call.
It can be seen here that no List is used in the whole data query process. This is because Iceberg completely records it. The tree structure of the whole file does not require List, which is directly pointed to by a single path, so there is no time-consuming List operation in query performance. This is object storage-friendly because storing objects on a List is a resource-intensive operation.
7. Iceberg Catalog
Iceberg provides catalogs with good abstraction to interconnect data storage and metadata management. As long as Iceberg Catalog abstraction is implemented, any storage has the opportunity to connect with Iceberg to organize access to the data lake scheme above.
As you can see from the figure above, the Catalog provides several aspects of abstraction.
- It can define a series of role files to Iceberg;
- Its File IO is customizable, including read, write and delete;
- Its namespace and table operations (also known as metadata operations) can also be customized;
- Both table reading/scanning and table submission can be customized with Catalog.
This can provide a flexible operating space, convenient docking of various underlying storage.
Object storage supports Iceberg data lake
1. The current Iceberg Catalog implementation
The existing Iceberg Catalog implementation in the community can be divided into two parts: data IO part and metadata management part.
As shown in the above, in fact the lack of private object storage oriented Catalog implementation, S3A can pick up objects stored in theory, but it is with the file system semantics, not natural object storage semantics, simulate the file operations will have extra overhead, and we want to achieve is all the data and metadata management are to an object storage, Rather than separate design.
2. Compare object storage and HDFS
The question is, why use object storage when you have HDFS?
As shown below, we compare object storage and HDFS from various perspectives.
In conclusion, we believe that:
- Object storage has advantages in cluster scalability, small file friendliness, multi-site deployment, and low storage overhead.
- The benefits of HDFS are that it provides append uploads and atomic rename, which is what Iceberg needs.
The advantages of the two stores are briefly described below.
1) Comparison: cluster scalability
-
The HDFS architecture holds all metadata in a single Name Node, which is limited by its single-node capability, so there is no scale-out capability for metadata.
-
Object storage generally USES hash, metadata is divided into individual pieces, the pieces to the services on different Node to manage, naturally it metadata ceiling will be higher, even in extreme cases can undertake rehash, cut the piece more fine, to more Node to manage the metadata, to expand capacity.
2) Comparison: small file friendly
Small files are becoming more common and a pain point in big data applications these days.
-
Hadoop distributed File System (HDFS) is based on architectural limitations, and the storage of small files is limited by resources such as memory of Name Node. Although HDFS provides the Archive method to merge small files and reduce pressure on Name Node, this requires additional complexity and is not native.
Similarly, TPS for small files is also limited by the processing power of the Name Node because it has only a single Name Node. The metadata of object storage is distributed and managed. Traffic can be well distributed among nodes, so that a large number of small files can be stored on a single Node.
-
Today, many object stores offer multi-media, hierarchical acceleration that can improve the performance of small files.
3) Comparison: multi-site deployment
-
Object storage supports multiple sites
- Global namespace
- Supports rich rule configuration
-
The multi-site deployment capability of object storage is applicable to the multi-activity architecture of two sites and three centers, while the HDFS does not have the native multi-site deployment capability. Although some commercial versions of HDFS have been seen to add the ability for multiple sites to handle data, the two systems may be independent and therefore do not support true global namespace multi-live capabilities.
4) Comparison: low storage overhead
-
For storage systems, to accommodate random hardware failures, it usually has a replica mechanism to protect data.
- For example, data is stored in three copies and stored separately on three nodes. The storage overhead is three times. However, it can tolerate the failure of two copies at the same time to ensure that data is not lost.
- The other is Erasure Coding, commonly referred to as EC. Taking 10+2 as an example, it cuts the data into 10 data blocks, then uses the algorithm to calculate two code blocks, a total of 12 blocks. Then distributed over four nodes, the storage overhead is 1.2 times. It can also tolerate the failure of two blocks at the same time. In this case, the remaining 10 blocks can be used to calculate all the data, thus reducing the storage overhead and achieving failure tolerance.
-
The HDFS uses the three-copy mechanism by default. The new HDFS version already supports the EC capability. After research, it is file based to do EC, so it has a natural disadvantage for small files. Because if the size of the small file is smaller than the size of the block requirement, it will be more expensive than it should be, because the two code blocks cannot be saved. In extreme cases, if it is the size of a single block of code, it is already equivalent to three copies.
In addition, once the EC is installed, the HDFS cannot support operations such as Append, hflush, and hsync. This greatly affects the scenarios where EC can be used. Object storage natively supports EC. For small files, it internally merges the small files into one large block for EC, ensuring that the data overhead is always constant, based on a pre-configured policy.
3. Challenge of object storage: data appending and uploading
In THE S3 protocol, objects are required to provide size when they are uploaded.
Taking THE S3 standard as an example, the S3 standard object storage does not support the interface for adding data to Iceberg when it interconnects with Iceberg. The protocol requires that the file size be provided when uploading files. So in this case, it’s actually not very friendly to this kind of streaming File IO coming in.
1) Solution 1: S3 Catalog data append upload – small file cache local/memory
For small files, streams are written to the local cache/memory as they come in, and when they are completely written, they are uploaded to the object store.
2) Solution 2: Appending S3 Catalog data – UPLOAD large files in segments on the MPU
For large files, MPU segments are uploaded using the S3 standard.
It is generally divided into several steps:
-
The first step is to create the initial MPU, get a Upload ID, and then give each section a Upload ID and a number, these blocks can be uploaded in parallel;
-
After the Upload is Complete, a step of Complete operation is required, which is equivalent to notifying the system. It will line up a large file based on the same Upload ID and all numbers from small to large.
-
To apply the mechanism to the data appending upload scenario, the normal implementation is to write a file, cache the file locally, and when the required size of the block is reached, it can be initialized to the MPU, and one of its blocks can start uploading. The same is done for each subsequent block until the last block is uploaded, and then a complete operation is called to complete the upload.
MPU has advantages and disadvantages:
-
The downside is that the MPU has a maximum number of shards, which may only be 10,000 in S3. If you want to support large files, this block cannot be too small, so for files smaller than the block, you still need to use the previous method to cache the upload.
-
The advantage of the MPU is its ability to upload in parallel. If you do an asynchronous upload, after the file reaches the cache, you don’t have to wait for the last block to be successfully uploaded. When the front face injection is fast enough, the asynchronous commit on the back end becomes a parallel operation. With this mechanism, it can provide a faster upload capability than a single stream.
4. Challenge of object storage: atomic commit
The next problem is atomic commit for object storage.
As mentioned earlier in the data injection process, the final commit is actually a linear transaction in several steps. First it reads the current snapshot version, then merges the file list this time, and commits its own new version. This operation is similar to the common “I = I +1” in programming. It is not an atomic operation and is not provided by the object storage standards.
The figure above shows a scenario where meta-information is committed concurrently.
-
Commit Worker 1 obtains v006, merges its own files, and commits V007 successfully.
-
There is another Commit Worker, 2, which also takes V006, merges it, and also provides V007. At this point we need a mechanism to tell it that v007 is in conflict and cannot be uploaded, and then let it Retry. After Retry, take out the new V007 merge and submit to V008.
This is a typical conflict scenario, and there needs to be a mechanism because if it doesn’t detect that it’s a conflict, the recommit v007 will overwrite the previous v007, resulting in all the data from the last commit being lost.
As shown in the figure above, we can solve this problem by using a distributed locking mechanism.
-
Commit Worker 1 obtains v006 and merges the file. Before committing, it obtains the lock and determines the current snapshot version. If it is V006, v007 can be successfully submitted. Unlock it after successful submission.
-
Similarly, when Commit Worker 2 gets the V006 merge, it can’t get the lock until Commit Worker 1 releases the lock. When you get the lock and check again, you will find that the current version is already V007, which conflicts with your own V007, so the operation will fail and it will Retry.
This is a lock to solve the concurrent commit problem.
5. Add Dell EMC ECS data
S3 standards-based object storage and Iceberg solutions have some problems, such as performance penalties or the need to deploy additional locking services.
Dell EMC ECS is also object storage. There is a different answer to this question. It is based on the STANDARD S3 protocol and has some extensions to support data appending and uploading.
Its append upload differs from the MPU in that there is no block size limit. The blocks can be set to smaller, and after uploading, the internal connections will be connected, and it will still be a valid file.
Append uploads and MPUs can be adapted to different scenarios to a certain extent.
The MPU has the ability to accelerate upload, and the performance of the append upload is sufficient even if the speed is not very fast. Moreover, it does not have the initialization and merging operations of the MPU, so the two can be used in different scenarios in terms of performance.
6. Dell EMC ECS solution under concurrent delivery
ECS object storage also provides an IF-match semantics and has such an interface capability on both Microsoft cloud storage and Google cloud storage.
-
If-match: Commit Worker 1 receives the eTag of the file when it commits v006. The system needs to determine whether the eTag of the file to be overwritten is the same as the actual eTag of the current file. If the eTag is the same, the overwritten operation is allowed, and then the v007 can be successfully submitted.
-
In the other case, Commit Worker 2 also obtains the eTag of v006, and when uploading the file, it finds that the eTag is different from the file in the current system, and then returns failed and triggers a Retry.
This implementation has the same effect as the locking mechanism and does not require an external redeployment of the lock service to ensure atomic commit problems.
S3 Catalog – Unified data storage
To recap, above we solved the problem of uploading data IO in file IO, and the problem of atomic submission of metadata tables.
After these problems are solved, the management of data and metadata can be delivered to object-based storage without the need to deploy additional metadata services, and the concept of unified data storage can be realized.
Iii. Demonstration scheme
As shown above, the demo uses Pravega, which can be understood simply as an alternative to Kafka, but optimized for performance.
In this example, we will inject the data into Pravega’s stream, and then Flink will read the data from Pravega for parsing and then store it in the Iceberg organization. Iceberg uses the ECS Catalog to connect directly to the object store, where there are no other deployments, and then uses Flink to read the data.
Four, some thinking of storage optimization
The figure above shows the data organization structure supported by Iceberg, which is directly stored in the Parquet file.
Our idea is that if this lake is actually the same as the metadata lake, is it possible that the generated Parquet file and the source file have large data redundancy? Can we reduce the storage of redundant information?
For example, in the most extreme case, the Parquet data file is not stored in Iceberg, where one of the information of the source file is recorded in Iceberg. When the query is required, the same effect can be achieved by customizable File IO to generate a format similar to Parquet in real time in memory according to the original File and submit it to the upper application query.
This approach, however, is limited to cases where the cost of storage is high but query performance is not. This can also be achieved by Iceberg’s good abstraction, because its File metadata and File IO are both abstracted, so you can take the source File and make it think it is a Parquet File.
Further thinking, can optimize query performance and save storage space at the same time.
For example, the common columns of the source file are taken out and the statistical information is sent to Iceberg. When reading, the source file and the cloud computing file are used to query the information quickly, and at the same time, the storage space of the infrequently used data columns is saved.
This is a preliminary idea. If it can be implemented, Iceberg can index not only the structured Parquet file format, but also some semi-structured and structured Data, so as to solve the upper-level query tasks through temporary calculation and become a more complete Data Catalog.