I. Core challenges of data entry into the lake

Real-time data into the lake can be divided into three parts, namely, data source, data pipeline and data lake (data warehouse). This paper will focus on these three parts.

1. Case #1: Data transmission is interrupted due to a BUG in the program

  • First of all, when the data source is transmitted to the data lake (data warehouse) through the data pipeline, it is very likely to encounter the situation of operation BUG, resulting in half of the data transfer, affecting the business;
  • The second problem is how to restart operations when this happens and ensure that the data is not duplicated or missing and completely synchronized to the data lake (warehouse).

2. Case #2: Data changes are too painful

  • Data changes

    Data changes bring great pressure and challenges to the entire link. Here is an example of a table that defines two fields, ID and NAME. At this point, the business students said that the address needs to be added to facilitate better mining user value.

    First, we need to add a column Address to the Source table, then chain the link to Kafka, then modify the job and restart it. Then the entire link must be reworked, new columns added, the job modified and restarted, and finally all data in the data lake (warehouse) updated in order to add new columns. The operation of this process is not only time consuming, but also introduces the problem of how to ensure that the data is isolated so that it does not affect the reading of the analysis job during changes.

  • Partition changes

    As shown in the figure below, the tables in the data warehouse are partitioned by “month”. Now we hope to change the partition to “day”, which may require all the data of many systems to be updated and then partitioned with a new strategy, which is very time-consuming.

3. Case #3: Increasingly slow near-real time reporting?

When services need more near-real-time reports, the data import period needs to be changed from days to hours or even minutes, which may cause a series of problems.

The first problem, as shown in the figure above, is that files grow at a visually visible rate, which puts more and more pressure on external systems. Pressure is mainly reflected in two aspects:

  • The first pressure is that it is getting slower to start analysis jobs and Hive Metastore is facing scaling problems, as shown in the following figure.

    • As the number of small files increases, the bottleneck of using the centralized Metastore becomes more and more serious, which causes the analysis job to start more and more slowly, because all the original data of the small file will be swept up when the job is started.

    • Second, because Metastore is a centralized system, it is easy to encounter the Metastore expansion problem. Hive, for example, may have to find ways to expand MySQL, resulting in high maintenance costs and overhead.

  • The second pressure is that scanning and analysis operations are getting slower.

    As the number of small files increases, the scanning process becomes slower and slower after the analysis is done. The essence is that the number of small files increases, causing scanning jobs to switch frequently between many Datanodes.

4. Case #4: Analyzing CDC data in real time is difficult

When you look at the various systems in Hadoop, you realize that the whole link needs to be fast, good, stable, and have good concurrency, which is not easy.

  • First of all, from the perspective of the source end, for example, if you want to synchronize MySQL data to the data lake for analysis, you may face a problem, that is, there is stock data in MySQL. If incremental data is continuously generated later, how to perfectly synchronize full and incremental data to the data lake, so as to ensure no more and no less data.

  • In addition, assuming that full and incremental switching at the source is solved, how to ensure that a number of LINES of CDC data are synchronized to the downstream if there is an exception in the synchronization process, such as an upstream Schema change that leads to job interruption?

  • The construction of the whole link needs to involve the switch between the full source and the synchronization, including the colluding of the intermediate data flow and the process of writing to the data lake (data warehouse). The construction of the whole link requires a lot of code and the development threshold is relatively high.

  • Finally, and critically, we found that in open source ecosystems and systems, it was very difficult to find efficient, highly concurrent analysis of the changing nature of CDC data.

5. Core challenges of data access to the lake

  • The data synchronization task is interrupted

    • Inability to effectively isolate the impact of writes on analysis;
    • Synchronization tasks do not guarantee exactly-once semantics.
  • End to end data changes

    • DDL causes complex link update and upgrade.
    • Difficulty in modifying lake/warehouse stock data.
  • Slower and slower near real time reporting

    • Frequent writes produce a large number of small files;
    • Metadata is under heavy pressure and starts slowly.
    • A large number of small files slow data scanning.
  • CDC data could not be analyzed in near real time

    • Difficult to switch from full to incremental synchronization;
    • It involves end-to-end code development with a high threshold;
    • The open source world lacks efficient storage systems.

2, Apache Iceberg introduction

1. Netflix: Hive cloud Pain points summary

The most important reason for Netflix to do Iceberg is to solve the pain points of Hive cloud, which are mainly divided into the following three aspects:

1.1 Pain point 1: difficulty in data change and backtracking

  1. ACID semantics are not provided. When data changes occur, it is difficult to isolate the impact on the analysis task. Typical operations are: INSERT OVERWRITE; Modify data partitions; Modify the Schema;

  2. Unable to handle multiple data changes, causing conflict problems;

  3. Unable to go back to the historical version effectively.

1.2 Pain Point 2: Difficulty in replacing HDFS with S3

  1. Data access interface directly depends on HDFS API;

  2. Relying on the atomicity of the RENAME interface, which makes it difficult to achieve the same semantics for object stores like S3;

  3. The list interface relies heavily on file directories, which is inefficient on object storage systems.

1.3 Pain point 3: Too many details

  1. The behavior of different file formats is inconsistent during Schema change. Different fileformats don’t even support data types consistently;

  2. Metastore only maintains partition level statistics, causing no task plan overhead. Hive Metastore is difficult to expand.

  3. A non-partition field cannot be a partition prune.

2. Apache Iceberg core features

  • Generalize the standard design

    • Perfect decoupling computing engine
    • Schema standardization
    • Open data formats
    • Support for Java and Python
  • Complete Table semantics

    • Schema definition and change
    • Flexible Partition policies
    • ACID semantics
    • The Snapshot semantics
  • Rich data management

    • Store stream batch unification
    • Extensible META design support
    • Batch updates and CDC
    • Support file encryption
  • Cost performance

    • Calculate the push-down design
    • Low cost metadata management
    • Vectorization
    • Lightweight index

3. Apache Iceberg File Layout

The top is a standard Iceberg TableFormat structure. The core is divided into two parts, one is Data and the other is Metadata. Both parts are maintained on S3 or HDFS.

4. Apache Iceberg Snapshot View

The figure above shows the general process of Iceberg’s write and read.

You can see there are three layers:

  • The top yellow is the snapshot;

  • The blue one in the middle is Manifest;

  • At the bottom are the files.

Each write produces a batch of files, one or more Manifests, and snapshots.

For example, snapshot snap-0 is generated for the first time, snapshot snap-1 is generated for the second time, and so on. However, when maintaining the original data, it is incremental step by step to do additional maintenance.

In this way, Iceberg can help users to do batch data analysis on a unified storage, and can also do incremental analysis between snapshots based on the storage. This is also the reason why Iceberg can do some support on read and write of stream and batch.

5. Choose an Apache Iceberg company

The figure above shows some companies currently using Apache Iceberg. The domestic examples are familiar to everyone. Here is a general introduction to the use of foreign companies.

  • NetFlix now has hundreds of petabytes of data on top of Apache Iceberg, while Flink’s daily data increment is hundreds of terabytes.

  • Adobe’s new data increment scale is T per day, and the total data size is about dozens of PB.

  • AWS uses Iceberg as the base of its data lake.

  • Cloudera builds its entire public cloud platform based on Iceberg. The trend of HDFS private deployment like Hadoop is weakening, while the trend of cloud going up is gradually increasing. Iceberg plays a key role in the cloud stage of Cloudera’s data architecture.

  • Apple has two teams that use it:

    • First, the whole iCloud data platform is built on Iceberg.
    • The second is the artificial intelligence voice service Siri, which is also based on Flink and Iceberg to build the whole database ecology.

3. How do Flink and Iceberg solve problems

Back to the most important part, how Flink and Iceberg solve a series of problems encountered in the first part is described below.

1. Case #1: Data transmission is interrupted due to a BUG in the program

First of all, Flink is used to synchronize the link, which can ensure the semantics of exactly once. When the operation fails, strict recovery can be made to ensure the consistency of data.

The second is Iceberg, which provides rigorous ACID semantics to help users easily isolate the adverse impact of writes on analysis tasks.

2. Case #2: Data changes are too painful

As shown above, when data changes occur, Flink and Iceberg can solve this problem.

Flink can capture the event of the upstream Schema change, and then synchronize the event to the downstream. After the synchronization, the downstream Flink directly forwards the data down and forwards it to the storage. Iceberg can change the Schema instantly.

When Using Schema as DDL, Iceberg directly maintains multiple versions of Schema, and then the old data source is not moved at all, and the new data is written to the new Schema, realizing one-key Schema isolation.

Another example is the problem of zoning changes, which Iceberg does as shown in the figure above.

If you want to change the Partition from “month” (yellow data block above) to “day”, you can directly change the Partition by one key. The original data remains unchanged, and all new data is partitioned by “day”, meaning to achieve ACID isolation.

3. Case #3: Increasingly slow near-real time reporting?

The third problem is the pressure small files put on Metastore.

First of all, For Metastore, Iceberg stores original data uniformly in the file system and maintains it in the way of metadata. The whole process actually removes the centralized Metastore and only relies on the file system extension, so it has good scalability.

Another problem is the increasing number of small files, resulting in slower data scanning. Flink and Iceberg offer a number of solutions to this problem:

  • The first solution is to optimize the problem of small files and Shuffle the files according to the Bucket Shuffle mode. Since Shuffle is a small file, the files will be naturally small.

  • The second option is batch jobs that periodically merge small files.

  • The third solution, which is smarter, is to automatically and incrementally merge small files.

4. Case #4: Analyzing CDC data in real time is difficult

  • First of all, there is the problem of full and incremental data synchronization. In fact, the community already has Flink CDC Connected scheme, which means that Connected can automatically do full and incremental seamless connection.

  • The second problem is how to keep the Binlog rows synchronized to the lake during synchronization, even if there are exceptions.

    For this problem, Flink does a good job of recognizing different types of events at the Engine level, and then using Flink’s exactly once semantics, it can automatically recover and handle failures when they occur.

  • The third problem is that building the whole link requires a lot of code development, the threshold is too high.

    After using Flink and Data Lake schemes, only need to write a source table and sink table, and then an INSERT INTO, the whole link can be opened without writing any business code.

  • Finally, how the storage tier supports near real-time CDC data analysis.

Community Roadmap

The map above shows Iceberg’s Roadmap, where it only launched one version in 2019, but went straight to three versions in 2020 and became a top project in 0.9.0.

The map above shows Flink and Iceberg’s Roadmap, which can be divided into four stages.

  • The first stage is for Flink to establish a connection with Iceberg.
  • The second stage is Iceberg replacing Hive. In this scenario, a lot of companies have started online, landing their own scenes.
  • The third stage is to solve more complex technical problems through Flink and Iceberg.
  • The fourth stage is to change this set from a simple technical scheme to a more perfect product scheme.