The processing of variable data has always been a major difficulty of big data systems, especially real-time systems. After investigating various schemes, we chose the data intake scheme of CDC to Hudi. At present, minut-level real-time data can be realized in the production environment. We hope that what is described in this paper can inspire everyone’s production practice. The contents include:
- background
- CDC and Data Lake
- Technical challenges
- The effect
- Future plans
- conclusion
The background,
As a customer data platform (CDP), Linkflow provides enterprises with a closed operation loop from customer data collection, analysis to execution. Every day, a large amount of data will be collected through one data acquisition endpoint (SDK) and three data sources, such as wechat and Weibo. All of this data is cleaned, calculated, consolidated and written into storage. Users can analyze and calculate persistent data through flexible reports or labels, and the results will be used as the data source of MA (Marketing Automation) system, so as to achieve accurate Marketing to specific groups.
In Linkflow, Data is divided into Immutable Data and Mutable Data. All these Data are analyzed. There are about a dozen tables involved, and the amount of Immutable Data is large, reaching billions of levels. In a traditional big data system, immutable data is factual data and variable data is dimensional data. However, in real business practice, users’ natural attributes, order amount and status are all updatable, and the data volume of these data is often very considerable. In our system, such data will reach hundreds of millions. For variable data has always been managed by relational database MySQL, on the one hand, data maintenance is convenient, and on the other hand, business docking is easy.
But the problem is obvious:
- Data fragmentation: Due to the high risk of online DDL of MySQL large tables, new sub-tables are often needed to expand service attributes with the increase of service complexity. In other words, a complete user data will be scattered in multiple tables, which is very unfriendly to query.
- Multidimensional query cannot be realized, because the advantage of relational database is not multidimensional query, and it is not practical to index all fields, so a data component that can support OLAP query engine is needed to support the business scenario of multidimensional analysis. And given the possibility of scaling independently in the future, we are also prioritizing a separate architecture for computing and storage.
CDC and Data Lake
CDC (CHANGE DATA CAPTURE) is a software design pattern used to identify and track changed DATA so that action can be taken on the changed DATA. We actually had experience using canal redundant MySQL data to heterogeneous storage two years ago, but we didn’t realize that we could integrate with big data storage in this way. We found some performance issues while using Canal, and the open source community was largely unmaintained, so we investigated Maxwell and Debezium before launching the new architecture. Just noticed Flink parent Ververica’s open source project Flink-CDC-Connectors [1], which embedded Debezium as a binlog synchronization engine into Flink tasks, Binlog messages can be easily filtered, verified, consolidated, and formatted in stream tasks with excellent performance. Considering that in the future, dual-stream join can be carried out directly with behavioral data, and even simple risk control can be carried out through CEP, we finally chose the CDC scheme of Debezium in Flink.
Because of the large number of data topics in MySQL, we also did data routing in the stream task. That is, the data of different topics will be routed to different Kafka topics, namely Kafka as ODS. In this way, there are many advantages. First, we can clearly observe the process of each change of variable data. Second, we can play back the data, and the superposition result of successive changes is the final state.
The next thing to consider is where the data resides. Combined with the principle of “computing and storage separation” mentioned above, this is one of the advantages that data lakes provide. Data lakes are typically built using similar file system storage (object storage or traditional HDFS), which is exactly what we expect. After comparing several data lake solutions, we chose Apache Hudi for the following reasons:
- Hudi provides a solution to upsert in HDFS, which is like a relational database experience, is very friendly to updatable data, and also conforms to MySQL binlog semantics.
- Incremental query can easily obtain the data that has changed in the last 30 minutes or one day. This is very friendly for some offline computing tasks that can be stacked. It is no longer necessary to calculate the full data, but only need to calculate the changing data, which greatly saves machine resources and time.
- Metadata can be synchronized to Hive in real time, creating conditions for query after entering the lake.
- Optimized COW and MOR scenarios respectively.
- Hudi community is open and has fast iteration speed. In its incubation stage, IT was integrated by AWS EMR, and then integrated by Ali Cloud DLA Data Lake analysis [2], Ali Cloud EMR[3] and Tencent Cloud EMR[4]. It has a good prospect. More and more companies are building data lakes based on Hudi in China.
After integrating Hudi, our architecture evolved to look like this:
Data table, chose the COW (when writing copy) mode, mainly considering the characteristic of read more to write less, and we need to query the process as fast as possible MOR (read) to the strategy in the query performance is somewhat weak, plus time delay for data and not to the requirements of the second grade, so finally chose the COW.
At the top level, we use Presto as an analysis engine, providing the ability to query data AD hoc. Since we are using Hudi version 0.6.0, the integration with Flink has not been released yet, so we had to adopt the Flink + Spark dual-engine strategy of using Spark Streaming to write data from Kafka to Hudi.
Third, technical challenges
After doing the PoC, we decided on the architecture design shown in the figure above, but in the actual implementation process, there were no small challenges.
3.1 CDC operation mode customization
■ Full mode
One of Debezium’s strengths is batch streaming. The Snapshot phase scans the full table to play back the data into a message consistent with the contents of the binlog incremental log, so that the user can process both the full and incremental data using the same code. However, in our business practice, if the number of history tables and the data in the tables are too large, the Snapshot phase can be very long, and if there is an unexpected interruption to the process, the next time you need to rescan from the first table. Assuming that complete the snapshot process takes a few days, so the scale of the “retry” is unacceptable, we therefore need to have similar breakpoint continuingly mechanism, after query Debezuim official document. We found the snapshot. Include collection. The parameter list.
An optional, comma-separated list of regular expressions that match names of schemas specified
in table.include.list for which you want to take the snapshot.
Copy the code
Therefore, the remaining tables to be scanned can be passed in with this parameter after snapshot is interrupted, thus achieving the “relay” capability. However, it is important to note that no matter how many retries are made in the Snapshot phase, the incremental binlog point must be the same as the first snapshot point, otherwise data will be lost. Another problem with this is that if we interrupt and relay until snapshot is complete, Debezuim will automatically start incremental synchronization from the binlog point of the snapshot, not the first one. This is not what we want. We need the task to terminate immediately after the snapshot ends.
Looking through a lot of Debezuim documentation did not find such a function, but in the process of browsing the source code to see that there is a way.
/**
*Perform a snapshot andthen stop before attempting to read the binlog.
*/
INITIAL_ONLY("initial_only",true);
// MySqlConnectorTask.java
if(taskContext.isInitialSnapshotOnly()){
logger.warn("This connector will only perform a snapshot, and will stop after that completes.");
chainedReaderBuilder.addReader(newBlockingReader("blocker",
"Connector has completed all of its work but will continue in the running state. It can be shut down at any time."));
chainedReaderBuilder
.completionMessage("Connector configured to only perform snapshot, and snapshot completed successfully. Connector will terminate.");
}
Copy the code
In initial_Only mode, Debezuim uses BlockingReader instead of BinlogReader to block the thread and stop incremental consumption.
■ Incremental mode
If the task stops automatically after the snapshot ends, you need to manually restart the task to continue incremental synchronization. The incremental mode needs to support the specified MySQL binlog file and position. Debezuim has schemA_only_recovery mode, and you can set parameters manually.
DebeziumOffset specificOffset =newDebeziumOffset();
Map<String,Object> sourceOffset =newHashMap<>();
sourceOffset.put("file", startupOptions.specificOffsetFile);
sourceOffset.put("pos", startupOptions.specificOffsetPos);
specificOffset.setSourceOffset(sourceOffset);
Copy the code
As the version of Ververica/Flink-CDC-Connectors we used before was 1.2.0, the schemA_only_recovery mode of Debezuim was not opened, so we modified the relevant source code. Currently supported in version 1.3.0, pass in MySQLSourceBuilder as a startup parameter.
3.2 Patch Update
Here it is necessary to explain what is covering update what is part of the update, this is also corresponding to the semantics of a RESTful, put is covering update, ask the caller to provide the object must be a complete resources, in theory, if use the put, but it does not provide a complete resource objects, so missed the field should be empty. Patch corresponds to partial update or partial update. The caller only provides the fields to be updated instead of the complete resource object, which can save bandwidth.
In Hudi, only overwrite updates are supported by default. However, for our business, the data reported by the collection endpoint cannot contain complete business objects, such as the increase of the user’s age, which only contains one field of information when reported.
{
"id": 123,
"ts": 1435290195610,
"data": {
"age": 25
}
}
Copy the code
This requires finding the data content of rowkey=123 and merging it with the content to be updated before writing. If the fields to be written are not empty, merge. Hudi OverwriteWithLatestAvroPayload combineAndGetUpdateValue methods by default.
Simply overwrites storage with latest delta record
Copy the code
For forward compatibility, data development colleague Karl added OverwriteNonDefaultsWithLatestAvroPayload class, overriding the combineAndGetUpdateValue to deal with the problem, And has set up a feedback to the community [HUDI – 1255] Add new content (OverwriteNonDefaultsWithLatestAvroPayload) for updating specified fields in Storage [5], in fact, there are many similar requirements in the community, such as [HUDi-1160] Support Update partial Fields for CoW Table [6], we also look forward to more developers to make this function more perfect.
Of course there are limits, if really hope will update a field for the null values, then using OverwriteNonDefaultsWithLatestAvroPayload could not be achieved.
We’ve also updated our community Compaction strategy by adding a new time-based Compaction that compacts operations not only from an incremental commit, but also from a new Compaction based on time. See [Hudi-1381] Schedule compaction Based on Time Elapsed [7], which provides additional flexibility when compaction is performed within a specified time.
3.3 Merging of the same RowKey data within a batch
Since one of the characteristics of CDC is to monitor data changes in real time, for example, the status of an order may change several times in a few minutes, and Spark Streaming microbatch processing features, there is a high probability that a large number of rowkey data will be acquired in one time window. Different rowkeys correspond to some data, so we merge a batch of data with the same rowkeys in the Streaming task, which is similar to Hudi’s logic of using Bloom to judge whether a rowkey exists. Special attention should be paid to timing issues. Data stacking must strictly follow TS time, otherwise the data of the old version will overwrite the new version.
3.4 Schema evolution
Schema evolution is a must because of business development and flexibility. Hudi also happens to take this into account, as we learn from Hudi wiki[8] :
What's Hudi's schema evolution story
Hudi uses Avro as the internal canonical representation for records, primarily due to its nice schema compatibility & evolution[9] properties. This is a key aspect of having reliability in your ingestion or ETL pipelines. As long as the schema passed to Hudi (either explicitly in DeltaStreamer schema provider configs or implicitly by Spark Datasource's Dataset schemas) is backwards compatible (e.g no field deletes, only appending new fields to schema), Hudi will seamlessly handle read/write of old and new data and also keep the Hive schema up-to date.
Copy the code
Since the Avro format itself supports Schema evolution, naturally Hudi also supports it.
- Schema evolution can be broadly divided into four types:
- Backwards compatible: Old data can be read using the new schema, and default values can be used if the fields have no values, which is also compatible with Hudi.
- Forwards compatible: Forward compatibility. New data can be read with the old schema. Avro will ignore the newly added fields.
- Full compatible: Supports forward and backward compatibility. For Full compatibility, only fields with default values need to be added and only fields with default values need to be removed.
- No Compatibility Checking: Generally, the type of a field needs to be forcibly changed. In this case, full data migration is required. It is not recommended.
In production practice, we can implement the requirement of field extension by modifying the schema. However, some problems can also be found, such as too many fields can make a single file very large (more than 128MB), and write slowly. In extreme cases, files with more than 1000 columns can be written in hours. In the future, we are also looking for some optimization solutions, such as field recycling or vertical table splitting, to reduce the number of fields in a single file.
3.5 Exceptions Occur Due to Simultaneous Query and Write
When we use Presto to query Hive tables, the Hudi metadata file cannot be found, which leads to THE NPE inside Hudi.
Error checking path :hdfs://hudipath/.hoodie_partition_metadata, under folder: hdfs://hudipath/event/202102; nested exception is java.sql.SQLException: Query failed (#20210309_031334_04606_fipir)
Copy the code
Based on the above information, it is suspected that the metadata information is modified during the query. After reaching out to the community, we changed hoodiePathCache from HoodieROTablePathFilter to thread-safe ConcurrentHashMap, Jar and hudi-common.jar were repackaged, replaced to presto/plugin/hive-hadoop2 directory, and presto was restarted. No NPE was found later.
Effect of four,
To review our vision for a data lake at the beginning of the architecture:
- Variable data is supported.
- Schema Evolution is supported.
- Computing and storage separation, supporting multiple query engines.
- Support for incremental views and time travel.
Hudi has basically realized all of these features. After the completion of the new architecture, compared with the previous system, data delay and offline processing performance have been significantly improved, as shown in the following aspects:
- The real-time data writing process is simplified, and the previous update operation is cumbersome. Now, there is no need to care about the new or update operation in the development process, which greatly reduces the mental burden of developers.
- The time of real-time data entering the lake to be queryable was shortened. Although COW table mode was adopted, the actual test found that the timeliness of entering the lake to be queryable was not low, basically at the level of minutes.
- Improved offline processing performance. Based on Hudi’s incremental view feature, daily offline tasks can easily retrieve the data changed in the last 24 hours, and process the data of a smaller magnitude, resulting in a shorter processing time.
5. Future plans
5.1 Flink integrated
As mentioned earlier, the “forced” dual-engine strategy was in fact very frustrating, and the operation and development methods could not be unified. Therefore, we are paying close attention to the progress of Hudi’s official Flink integration, and recently there is a new RFC-24: Hoodie Flink Writer Proposal[10] has also deeply integrated Flink capability in Hudi 0.8.0. It is expected that the performance of Flink integrated version in the future can be greatly improved and the processing engine can be unified into Flink. No more twin-engine mode.
5.2 concurrent writes
Because Hudi files do not support concurrent writes prior to 0.8.0 to ensure consistency of metadata. However, in practical applications, a lot of data in the data lake are not only real-time data, but also need to be obtained through off-line calculation. If some fields in a table are directly reflected by CDC and some fields are calculated by off-line tasks, concurrent writing will be required.
We currently use two ways to circumvent this:
Vertical split table, that is, the two parts of the file are separated, CDC data is written through Spark Streaming, and offline calculation results are written to another file to avoid concurrent writing.
Kafka is simulated as CDC message write back. In the case that the query performance cannot be divided into tables, the offline calculation result is simulated as CDC message write to Kafka, and then written to Hudi through Spark Streaming. However, the disadvantage is also obvious, that is, the results of offline tasks reflect a long time to the final storage.
The recent release of Hudi version 0.8.0 already supports concurrent write mode, which is based on optimistic locking. File level conflict detection can meet concurrent write requirements very well. We will test the effect later.
5.3 Performance Optimization
The problems mentioned above, such as large files and frequent GC, are found in two main places.
S index
Because we are currently using HoodieGlobalBloomIndex, it takes a long time to create and query the index. There are three index implementations:
How does the Hudi indexing work
& what are its benefits?
The indexing component is a key part of the Hudi writing and it maps a given recordKey to a fileGroup inside Hudi consistently. This enables faster identification of the file groups that are affected/dirtied by a given write operation.
Hudi supports a few options for indexing as below
• HoodieBloomIndex (default)
: Uses a bloom filter and ranges information placed in the footer of parquet/base files (and soon log files as well)
•HoodieGlobalBloomIndex : The default indexing only enforces uniqueness of a key inside a single partition i.e the user is expected to know the partition under which a given record key is stored. This helps the indexing scale very well for even very large datasets[11]. However, in some cases, it might be necessary instead to do the de-duping/enforce uniqueness across all partitions and the global bloom index does exactly that. If this is used, incoming records are compared to files across the entire dataset and ensure a recordKey is only present in one partition.
•HBaseIndex : Apache HBase is a key value store, typically found in close proximity to HDFS. You can also store the index inside HBase, which could be handy if you are already operating HBase.
Copy the code
You can implement your own index if you’d like, by subclassing the HoodieIndex class and configuring the index class name in configs. After discussions with the community, we prefer to use HBaseIndex or a similar K-V store to manage indexes.
S update
Upsert’s slowness is also related to CDC’s characteristics, in addition to some problems with larger files. In fact, the update range of variable data is unpredictable. In extreme cases, when 1000 data to be updated belong to 1000 different files, it is difficult to improve the update performance through code optimization, but can only increase CPU resources to improve the parallelism of processing. We will approach it in several ways:
- Parameter adjustment to see if there is a way to balance the number and size of files.
- Try to use MOR mode for some business tables. MOR writes data to log files before merging it into Parquet, which theoretically reduces the frequency of overwriting Parquet files.
- Discuss business trade-offs in exchange for better write speeds.
Six, summarized
There is still a lot of work to be done in the future. We will also actively participate in community building and try out new features to bring users a better data service experience. Finally, we would like to thank the developers and community maintainers of Flink CDC Connectors and Apache Hudi.
Author | Dean, Principal Architect, Linkflow
Reference link:
[1] flink – CDC – connectors: github.com/ververica/f…
[2] Aliyun DLA data Lake analysis: help.aliyun.com/document_de…
[3] Ariyun EMR: help.aliyun.com/document_de…
[4] tencent cloud EMR: cloud.tencent.com/document/pr…
[5] [HUDI-1255] Add new Payload(OverwriteNonDefaultsWithLatestAvroPayload) for updating specified fields in storage: Github.com/apache/hudi…
[6] [HUDI-1160] Support Update Partial Fields for CoW Table: github.com/apache/hudi…
[7] [HUDi-1381] Schedule compaction based on Elapsed: github.com/apache/hudi…
[8] wiki: cwiki.apache.org/confluence/…
[9] schema compatibility & evolution: docs. Confluent. IO/current/SCH…
[10] the RFC – 24: Hoodie Flink Writer Proposal: cwiki.apache.org/confluence/…
[11] Very large datasets: eng.uber.com/uber-big-da…
[12] [email protected]: mailto:[email protected]