Author: Dong Yu, joined Qunar in 2021, mainly responsible for the operation and maintenance and platform development of data platform Flink.

This article introduction

I. Background and characteristics

1. The background

In the process of using Flink to do real-time data warehouse and data transmission, encountered some problems: such as Kafka data loss, Flink combined with Hive near real-time data warehouse performance, etc.. The new feature of Iceberg 0.11 addresses these business scenarios. In contrast to Kafka, Iceberg has its advantages in certain situations.

2. Original architecture scheme

The original architecture used Kafka to store real-time data. Then use Flink SQL or Flink datastream to consume data for circulation. An internal platform for SQL and Datastream submission is developed and real-time jobs are submitted through this platform.

3. The pain points

Kafka has high storage costs and a large amount of data. Kafka has a short data expiration time due to high pressure. If data is not consumed within a certain period of time, data will expire, resulting in data loss. Flink does near-real time read and write support on Hive. To share the load on Kafka, put some data that is not very real-time into Hive and have Hive partition at the minute level. However, as the amount of metadata increases, the pressure on Hive metadata becomes more and more significant, the query becomes slower, and the pressure on the database that stores Hive metadata increases.

Ii. Background and characteristics

A. Iceberg B. Iceberg

The term parse

  • Data files

The Iceberg table is a file that actually stores data, typically in the data directory ending with “.parquet”.

  • Manifest file

Each row is a detailed description of each data file, including the status of the data file, the file path, partition information, column level statistics (such as the maximum and minimum values of each column, the number of null values, etc.), through the file, irrelevant data can be filtered out and retrieval speed can be improved.

  • Snapshot

A snapshot represents the state of a table at a point in time. Each snapshot version contains a list of all data files at one point in time. Data files are stored in different manifest files. The manifest files are stored in a manifest list, and a manifest list represents a snapshot.

2. Iceberg query plan

A query plan is the process of finding the files needed for a query in a table.

  • Metadata filtering

The manifest file contains partitioned data tuples and column-level statistics for each data file. Query predicates are automatically converted to predicates on partitioned data during planning and are first applied to filtering data files. Next, use column-level value counts, null counts, lower limits, and upper limits to eliminate files that do not match the query predicate.

  • Snapshot ID

Each Snapshot ID is associated with a set of manifest files, and each set of manifest files contains many manifest files.

  • Manifest Files List of files

Each MANIFEST file records the metadata information of the current data block, including the maximum and minimum values of file columns. Then, according to the metadata information, index to specific file blocks, so as to faster query data.

Pain point 1: Kafka data loss

1. Pain point introduction

We usually choose Kafka for real-time storehouse and log transfer. Kafka itself has a high storage cost, and the data retention time is time-sensitive. Once the consumption backlog, data will be lost and not consumed after the expiration time.

2. Solutions

Putting business data into the lake that does not require high real time requirements, such as 1-10 minute delays. Because Iceberg 0.11 also supports REAL-TIME SQL reading, and can also save historical data. This reduces the stress on Kafka online and ensures that data can be read in real time without being lost.

3. Why is Iceberg only able to enter the lake in near real time?

(1) Iceberg submits transactions at file granularity. This makes it impossible to commit transactions in seconds, which would cause the number of files to swell;

② No online service node exists. For real-time write with high throughput and low latency, pure real-time response cannot be obtained.

③ Flink data is written in the unit of checkpoint. Physical data written to Iceberg cannot be queried directly. Metadata files are written only when checkpoint is triggered, and the data changes from invisible to visible. Checkpoint execution takes a certain amount of time each time.

4. Flink analysis into the lake

Component is introduced

  • IcebergStreamWriter

It is mainly used to write records to the corresponding AVro, Parquet and ORC files to generate a corresponding Iceberg DataFile and send it to the downstream operator. The other IcebergFilesCommitter is used to collect all DataFile files and submit transactions to Apache Iceberg when a checkpoint arrives. Data is written to the checkpoint and a DataFile is generated.

  • IcebergFilesCommitter

We maintain a List of DataFile files for each checkpointId, map<Long, List>, so that even if a checkpoint transaction fails, Its DataFile files are still maintained in State, and data can still be submitted to the Iceberg table via subsequent checkpoint.

5. Flink SQL Demo

The Flink Iceberg real-time lake-entry process consumes Kafka data to write Iceberg data and reads data from Iceberg in near real-time.

5.1 Preliminary Work

Set execution.type = Streaming Enable the table SQL hint to use the OPTIONS attribute set table.dynamic-table-options.enabled=true Register the Iceberg Catalog to manipulate the Iceberg table

CREATE CATALOG Iceberg_catalog WITH (" +
            "  'type'='Iceberg'," +
            "  'catalog-type'='Hive'," +
            "  'uri'='thrif://localhost:9083'" +
            ");
Copy the code

Kafka real-time data into the lake

insert into Iceberg_catalog.Iceberg_db.tbl1  
            select * from Kafka_tbl;
Copy the code

Real-time flow between data lakes TBL1 -> TBL2

insert into Iceberg_catalog.Iceberg_db.tbl2  
    select * from Iceberg_catalog.Iceberg_db.tbl1 
    /*+ OPTIONS('streaming'='true', 
'monitor-interval'='10s',snapshot-id'='3821550127947089987')*/
Copy the code

5.2 Parameter Description

  • Monitor-interval Specifies the interval for continuously monitoring new submitted data files (default: 1s).

  • Start-snapshot-id Reads data from the specified snapshot ID. Each snapshot ID is associated with a group of manifest file metadata files. Each metadata file maps its own real data file.

6. Tread pit records

I used to write data to Iceberg in THE SQL Client, and the data in the data directory is always updated, but the metadata has no data, resulting in no number in the query, because Iceberg queries need metadata to index real data. The SQL Client does not enable checkpoint by default. You need to enable checkpoint through the configuration file. The data directory writes data, but the metadata directory does not write metadata.

PS: Check whether the SQL server or Datastream is used to log in to the lake.

7. Sample data

The following two graphs show the effect of searching Iceberg in real time, and the change of data before and after one second.

One second ago

Data refreshed after one second

Pain point 2: Flink combines Hive in near real time more and more slowly

1. Pain point introduction

Flink + Hive near-real-time architecture although it supports real-time read and write, but this architecture brings the problem that with the increase of tables and partitions, will face the following problems:

  • Excessive metadata Hive partitions are changed to hour/minute partitions, which improves the real-time performance of data. However, the pressure on MeteStore is also obvious. Excessive metadata slows query plan generation and affects the stability of other online services.

  • As metadata increases, the pressure on the database storing Hive metadata increases. After a period of time, you need to expand the database, for example, the storage space.

2. Solutions

Migrate the original Hive to Iceberg in near real time. Why is Iceberg able to handle large amounts of metadata, while Hive is prone to bottlenecks when it has large amounts of metadata?

Iceberg maintains metadata on scalable distributed file systems without a centralized metadata system. Hive maintains metadata on partitions in MetaStore (too many partitions cause huge pressure on mysql). The metadata in a partition is actually maintained in a file (starting a job requires enumerating a large number of files to determine whether a file needs to be scanned, a time-consuming process).

Five. Optimization practice

1. Small file processing

Before Iceberg 0.11, the batch API was triggered periodically to merge small files, which could merge, but needed to maintain a set of Actions code, and it was not real-time merge.

Table table = findTable(options, conf);
Actions.forTable(table).rewriteDataFiles()
        .targetSizeInBytes(10 * 1024) // 10KB
        .execute();
Copy the code
  • New feature is Iceberg 0.11, which supports streaming small file merging. The partition/bucket key writes data in hash and merges files directly from the source. The advantage is that a task processes data in a partition and submits its own Datafile file. For example, a task only processes data in the corresponding partition. This avoids the problem of multiple tasks submitting many small files and requires no additional maintenance code. You only need to specify the attribute write-.distribution-mode when building a table. This parameter is common with other engines, such as Spark.
CREATE TABLE city_table ( 
     province BIGINT,
     city STRING
) PARTITIONED BY (province, city) WITH (
    'write.distribution-mode'='hash' 
);
Copy the code

2. Iceberg 0.11

2.1 Introduction to Sorting

Before Iceberg 0.11, Flink does not support Iceberg sorting function, so it can only support the sorting function with Spark in batch mode. 0.11 added support for sorting feature, which means that we can also experience this benefit in real time.

The nature of sorting is to make scanning faster, because max-min can filter out a large number of invalid data because all the data is sorted from smallest to largest after aggregating according to sort Key.

2.2 sorting demo

insert into Iceberg_table select days from Kafka_tbl order by days, province_id;
Copy the code

3. Iceberg is Iceberg

Parameter Description:

  • File_path: indicates the location of a physical file.

  • Partition: indicates the partition corresponding to the file.

  • Lowerbounds: In this file, the minimum values for multiple sort fields are shown below.

  • Upperbounds: The maximum number of sorted fields in this file. Here is my maximum number of days and provinceid. Determine whether to read filepath files based on the upper and lower limits of partitions and columns. After sorting data, file column information is also recorded in metadata. Query plans are used to locate files from the manifest, without storing information in Hive metadata. This reduces Hive metadata pressure and improves query efficiency.

Using the sorting feature of Iceberg 0.11, the day is used as the partition. The manifest file records the sorting rule, which improves the search efficiency when retrieving data. It not only achieves the search advantages of Hive partitions, but also avoids the pressure caused by excessive Hive metadata metadata.

conclusion

Compared with the previous version, Iceberg 0.11 adds a lot of new and useful functions. Compared with the previous version, the following conclusions are made:

1. Flink + Iceberg sorting function

Before Iceberg 0.11, Spark was integrated with sorting, but Flink was not integrated, when Spark + Iceberg 0.10 was used to migrate a batch of Hive tables. The benefits on BI are: BI originally built multi-level partitions to improve the Hive query speed, resulting in too many small files and metadata. In the process of entering the lake, Spark is used to sort the conditions of BI’s frequent query, combined with implicit partition, which ultimately improves the BI retrieval speed without the problem of small files. Iceberg has its own metadata. Also reduces Hive metadata stress. Icebeg 0.11 supports Flink sorting and is a useful feature point. We can transfer the original Flink + Hive partition to Iceberg sorting, which can not only achieve the effect of Hive partition, but also reduce small files and improve query efficiency.

2. Read data in real time

Through SQL programming, real-time data reading can be realized. The advantage is that it is possible to put Iceberg data that does not require high real-time performance, for example, business can accept 1-10 minutes of delay, which can not only reduce the pressure on Kafka, but also achieve near-real-time data reading and save historical data.

3. Merge small files in real time

Prior to Iceberg 0.11, small file merges needed to be maintained using Iceberg’s merge API, which required passing in table information, as well as timing information, and merging was done in batches, not in real time. In terms of code, increased maintenance and development costs; In terms of timeliness, not real time. 0.11 Merge data from source in real time using Hash mode. You only need to specify the (‘write.distribution-mode’=’ Hash ‘) attribute during SQL table creation. Manual maintenance is not required.