The author of this article is Liu Jie. He introduces the structure of sf Express data warehouse, some problems we have gone through, practical details of using Hudi to optimize the whole job state, and some plans for the future. The main contents are:
- Several warehouse architecture
- Hudi code lying in the pit
- State optimization
- The future planning
Sf Tech introduced Hudi as early as 2019, which was based on Spark batch processing at that time. In 2020, the company had higher requirements for real-time data, and upgraded the architecture to continuously optimize Binlog data CDC into the lake on the semi-finished product of community Hudi on Flink. With the rapid development of Hudi community, the company also put forward new requirements for the warehouse this year, and finally adopted the way of Flink + Hudi to broaden the real-time meter. There were a lot of problems in the process, mainly two things:
- The Hudi Master code had some bugs at the time;
- Wide tables involve multiple joins, and operations such as Top One make the state very large.
Fortunately, the community’s speed of repair and Hudi’s strong upsert capabilities enabled both issues to be effectively addressed.
1. Data warehouse architecture
Those who are interested can refer to the practice application of Hudi on Flink shared by SF Express before.
Two, Hudi code wade through the pit
Last year, we implemented Hudi on Flink practice based on Hudi 0.6, with older code. In order to embrace the community, we used the latest master code to practice. In the large data write scenario, we found a relatively hidden loss problem, which took nearly two weeks to locate.
1. Hudi StreamWriteFunctionOperator core process combing
StreamWriteFunction caches data in Fileld groups before receiving data. As data continues to flow, the cache becomes larger and larger, and flush is performed when a certain threshold is reached. The threshold is controlled by two core parameters: write.batch.size The default value is 64 MB, and write.task.max.size the default value is 1 GB. Flush is triggered when the packet data reaches 64 MB or the total cache data reaches 800 MB to 1 GB.
Flush calls the client API to create a WriteHandle, and then puts WriteHandle into the Map cache. A Handle can be interpreted as cow for a file.
If a fileld have been written many times during the same checkpoint, then after a is based on the previous cow, its handle is a FlinkMergeAndReplaceHandle, To determine whether a fileld has been written before, use the Map cache above.
StreamWriteFunction When snapshotState is executed, all packet data in the memory is flushed at a time, and then the Handle of the client is emptied.
2. Scenario restoration
Hudi itself has the upsert capability, so we began to think that Hudi Sink is ok in At Least Once mode, and Flink operator does not need to wait for Barrier alignment in At Least Once mode. Being able to process data that arrives first makes processing faster, so we set the Flink CheckpointingMode AT_LEAST_ONCE in the Copy On Write scenario.
WriteFunction upstream is the BucketAssignFunction fileld assignment operator, Insert data A, B, C, D belong to the same partition and are assigned to the same subtask of BucketAssignFunction, but A, B, C, D are adjacent to two different checkpoint.
If A finds no new small files available when entering BucketAssignFunction, A new Fileld f0 is created and assigned to F0 when B enters. At the same time, due to the AT_LEAST_ONCE mode, both C and D data may be processed and allocated to F0. In other words, in AT_LEAST_ONCE mode, four checkpoint insert data of A, B, C, and D are assigned to the same Fileld because data of C and D are processed in advance.
WriteFunction may flush A, B, and C after receiving A, B, and C, while D is left to the next checkpoint. D belongs to the next checkpoint. The handle created when A, B, and C are written is cleared. Wait until the next checkpoint to flush. Because D is also insert data, A file will be directly created to write data. However, the fileld of A, B, C, and D is the same. As A result, the file created by D overwrites the file written by A, B, and C, resulting in data loss of A, B, and C.
3. Locate the fault
The reason why this problem is difficult to locate is that there is a certain amount of randomness, each time the data is lost is different, and the small amount of data is not easy to occur. Finally, the Queryable State of Flink is enabled to locate the data that is lost, locate fileld, discover that instant of ABCD State is I, and parse all versions of Fileld for tracing and restoration.
Third, state optimization
We have carried out real-time implementation of the largest offline wide edge on the line. The wide table has many fields, which involves the left join of multiple tables on the main table and some calculation of Top One. All these operators occupy state. Our data cycle is longer and we need to store data for 180 days. It is estimated that the size of the state will reach hundreds of tons, which will undoubtedly put a lot of pressure on the persistence of the state. But these operations are easy to do in Hudi.
Top One sinks Hudi
In Hudi, there is a write-.precombination. field configuration item that specifies the use of a certain field to deduplicate flush data. When multiple fields need to be deduplicated, the largest One is retained by comparing the entire field.
In SQL, we combine Top One’s sorting logic into a single field set to Hudi’s write.precombination. field and write this field to state. Data from the same key is updated multiple times with state’s write.precombination. field.
The default state of Flink Top One is to save all fields of the entire record, but we saved only One field, greatly saving the size of state.
2. Multiple tables Left Join sink Hudi
2.1 Flink SQL join
Let’s simplify this scenario to the following example: suppose we have a wide table T_P consisting of three tables
insert into t_p
select
t0.id,t0.name,
t1.age,
t2.sex
from t0
left join t1 on t0.id = t1.id
left join t2 on t0.id = t2.id
Copy the code
In the Flink SQL JOIN operator, the state of a left table and a right table will be maintained, which are all the fields of each table, and one more join will create one more state. The result is state size expansion. If the upstream of the Join operator is an Append stream, the effect of state size expansion is more obvious.
2.2 Rewrite Join as Union All
For the above case, only a few fields were added in each left Join. We thought of rewriting SQL in the way of Union all. All fields need to be completed in union ALL, and the missing fields need to be filled with NULL. We consider null-supplemented fields not valid fields. After changing from union all, Hudi is required to have the ability of local update to achieve the effect of join.
-
Update only the ID and name fields when receiving data from T0;
-
Similarly, if the data is from T1, only the age field is updated;
-
T2 updates only the sex field.
Unfortunately, the default implementation of Hudi is full field override, which means that age sex will be overwritten to null when t0 data is received and name sex will be overwritten to null when T1 data is received. This is clearly unacceptable. This requires us to transform Hudi Sink.
2.3 Hudi Union All implementation
In COW mode, Hudi copies and overwrites old data to update each record. It seems that as long as we know which table the record comes from and which fields are valid, we can selectively overwrite the fields from copy. This does not work so well in a partition change scenario. In a partition change scenario, the logic for moving data from one partition to another is to delete data from the old partition and add data to the new partition. This may lose some of the previously partially updated field information. Hudi on Flink involves pipeline composed of several core operators.
-
**RowDataToHoodieFunction: ** This is to convert the revenue data into a HudiRecord. The received data contains all fields. When converting HudiRecord, we select only valid fields for conversion.
-
**BoostrapFunction: ** When the task is restored, it reads the file and loads the index data. When the task is restored, the secondary operator does not convert the data.
-
**BucketAssignFunction: ** This operator is used to assign location to records. Loaction contains two parts of information. One is partition directory, the other is fileld. Fileld is used to identify which file the record will be written to. Once the record is determined which file to write to, the record will be sent to StreamWriteFunction in fileld groups, and StreamWriteFunction will write in batches by file.
The original BucketAssignFunction operator logic is shown in the following figure. When receiving a record, the BucketAssignFunction will check the state to see if the record has been written before, and if so, the BucketAssignFunction will look for the corresponding location. If the partition has not changed, the current record is assigned to the location as well. If a location is not found in state, a new location is created, which is assigned to the current record and updated to state.
The location of the state store tells the file from which the current record should be updated or written. The scenario where a partition change is encountered is a little more complicated. If a record changes from the 2020 partition to 2021, a deleted record is created whose LOaction is the location in state. This record lets the downstream do the actual delete, then create a new location (partition 2021) and send it to the downstream for insert.
In order to implement top One in Hudi, we extend the state information to make top One time field.
{id:1,name:zs}, {ID :1,age:20}, {id:1,sex:man}, In the execution will create a flush all fields empty record {id: null, name: null, age: null, sex: a null}, then in turn and three records to merge. Note that this merge process only selects the merge of valid fields. The diagram below:
Update logic in the Update in the scene is similar to the insert scenario, if the old data is {id: 1, name: zs, age: 20, sex: man}, new received {id: 1, name: ls}, {30} id: 1, age: this article 2 data, The old data is read from the file and then merged with the new data in turn, using the same procedure as insert. The diagram below:
In this way, the effect of left Join is achieved through union all, which greatly saves the size of state.
4. Future planning
Parquet metadata information collection, parquet file can get the maximum and minimum information of each column from footer, we plan to collect these information after writing the file, and merge the metadata information based on the last commit to generate a metadata file containing all files. This allows predicates to be pushed down to filter files as data is read.
The company is committed to creating an SQL computing engine based on Hudi as the underlying storage and Flink as the integration of streaming and batch. Flink’s batch Hudi is still not very deep, and it may plan to use Flink to achieve clustering and other functions of Hudi in the future. Improve Hudi’s batch capabilities on the Flink engine.
For more technical problems related to Flink, you can scan the code to join the community nail nail exchange group for the first time to obtain the latest technical articles and community dynamics, please pay attention to the public number ~