[TOC]
Real-time data processing and real-time data
Flink and Spark Streaming are used for real-time processing of real-time data. The data requires real-time processing and the processing must be fast. The scenario where the data is not real-time and the processing is not timely is our data warehouse T+1 data
The Apache Hudi scenario discussed in this article is real-time data, not real-time processing. It aims to map time in Mysql to big data platforms such as Hive in near real time.
Business scenario and technology selection
The traditional off-line data warehouse usually has T+1 data, which cannot meet the demand of daily data analysis. However, streaming computing is generally based on Windows, and the window logic is relatively fixed. My company has a special need for business analysts who are familiar with the data structure of existing transaction databases and want a lot of ad-hoc analysis that includes real-time data of the day. Usually they are based on Mysql slave library, directly through Sql to do the corresponding analysis calculation. But many times, there are the following obstacles
- When the amount of data is large and the analysis logic is complex, it takes a long time for Mysql to retrieve data from the database
- Some cross-library analysis is not possible
As a result, several technical frameworks emerge that bridge the gap between OLTP and OLAP, typically TiDB. It supports both OLTP and OLAP. Apache Hudi and Apache Kudu serve as Bridges between existing OLTP and OLAP technologies. They can store data in existing OLTP data structures, support CRUD, and provide integration with existing OLAP frameworks (e.g. Hive, Impala) for OLAP analysis
Apache Kudu requires a separate cluster deployment. Apache Hudi does not need to use existing big data clusters such as HDFS to store data files, and then use Hive to do data analysis. It is relatively suitable for resource-constrained environments
Use Aapche Hudi holistic thinking
Hudi provides the concept of Hudi tables that support CRUD operations. We can use Hive to query and analyze Hudi tables by replaying Mysql Binlog data to Hudi tables. The data flow architecture is as follows
Hudi table data structure
Data files of Hudi tables can be stored in the file system of the operating system or distributed file system such as HDFS. To analyze performance and data reliability, HDFS is generally used for storage. In terms of HDFS storage, the storage files of a Hudi table are divided into two types.
- contains
_partition_key
The relevant path is the actual data file, stored by partition, of course the partition path key can be specified, I used _partition_key - Hoodie Because CRUD is fragmented, each operation generates a file. When more and more small files are generated, HDFS performance is seriously affected. Hudi designs a file merging mechanism. Log files related to file merge operations are stored in the. Hoodie folder.
The data file
The actual Hudi data files are stored in the Parquet file format
The hoodie file
Hudi refers to a series of CRUD operations on tables over time as Timeline. An operation in the Timeline is called Instant. Instant contains the following information
- Instant Action Records whether the Action is a data COMMITS, a file consolidation, or a file cleanup.
- Instant Time Indicates the Time when the operation occurs
- State Indicates the state of an operation, REQUESTED, INFLIGHT, or COMPLETED.
The hoodie folder stores the status records of the corresponding operations
Hudi record Id
Hudi needs to be able to uniquely identify a record in order to implement CRUD of data. Hudi will combine the unique field (record key) of the data set with the partitionPath of the data as the unique key of the data
COW and MOR
Based on the above basic concepts, Hudi provides two types of table COW and MOR. There are some differences in data write and query performance
Copy On Write Table
Hereinafter referred to as a COW. As the name suggests, it makes a copy of the data as it is written and adds new data to it. A request to read data is reading a near-complete copy, similar to the idea of Mysql’s MVCC.
In the figure above, each color contains all the data up to its date. Old data copies will be deleted when they exceed a certain number limit. For this type of table, there is no Compact Instant because it is already compact when written.
- Advantages Only one data file of the corresponding partition can be read, which is efficient
- Disadvantages During data writing, you need to make a copy of the original data and create a new data file based on the copy. This process is time-consuming. In addition, due to time consuming, the data read by a read request lags behind
Merge On Read Table
Referred to as “MOR. The newly inserted data is stored in the Delta log. Periodically merge delta logs into parquet data files. When the data is read, the delta log is merged with the old data file to return the complete data. Of course, like COW tables, MOR tables can ignore the delta log and only read the most recent complete data file. The following figure illustrates the two ways in which MOR reads and writes data
- Advantages Because the delta log is written first and the delta log is small, the write cost is low
- Disadvantages: Periodically consolidate and compact; otherwise, there are many fragmented files. Read performance is poor because the Delta log and old data files need to be merged
Hudi based code implementation
I have placed a Hudi based wrapper on Github with the corresponding source address github.com/wanqiufeng/…
Binlog data is written to the Hudi table
- The binlog-consumer branch uses Spark Streaming to consume the binlog data in Kafka and writes to the Hudi table. The binlog in Kafka is pulled synchronously through Ali’s Canal tool. The program entry is CanalKafkaImport2Hudi, which provides a set of parameters that configure the execution behavior of the program
Parameter names | meaning | If required | The default value |
---|---|---|---|
--base-save-path |
Hudi table stored in the basis of HDFS path, such as HDFS: / / 192.168.16.181:8020 / hudi_data / | is | There is no |
--mapping-mysql-db-name |
Specifies the name of the Mysql library to process | is | There is no |
--mapping-mysql-table-name |
Specifies the name of the Mysql table to process | is | There is no |
--store-table-name |
Specifies the table name for Hudi | no | By default, –mapping-mysql-db-name and –mapping-mysql-table-name are automatically generated. Suppose –mapping-mysql-db-name is CRM and –mapping-mysql-table-name is order. Then the final HUDI table is named crm__order |
--real-save-path |
Specifies the HDFS path where the HUDI table is ultimately stored | no | By default, it is automatically generated based on –base-save-path and –store-table-name. The generated format is ‘–base-save-path’+’/’+’–store-table-name’. The default value is recommended |
--primary-key |
Specify a field name that uniquely identifies the record in the synchronized mysql table | no | The default id |
--partition-key |
Specifies the time field in the mysql table that can be used for partitioning. The field must be of type TIMESTAMP or dateime | is | There is no |
--precombine-key |
Ultimately used to configure HUDIhoodie.datasource.write.precombine.field |
no | The default id |
--kafka-server |
Specify the Kafka cluster address | is | There is no |
--kafka-topic |
Specifies the queue to consume Kafka | is | There is no |
--kafka-group |
Specifies the group that consumes Kafka | no | Default ‘hudi’ prefix to storage table names, such as ‘hudi_crm__order’ |
--duration-seconds |
Because this program is developed using Spark Streaming, the duration of Spark Streaming microbatch is specified here | no | The default 10 seconds |
A working demo is shown below
/ data/opt/spark - 2.4.4 - bin - hadoop2.6 / bin/spark - submit - class com. Niceshot. Hudi. CanalKafkaImport2Hudi \ -- the name hudi__goods \ --master yarn \ --deploy-mode cluster \ --driver-memory 512m \ --executor-memory 512m \ --executor-cores 1 \ --num-executors 1 \ --queue hudi \ --conf spark.executor.memoryOverhead=2048 \ --conf "spark.executor.extraJavaOptions=-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=\tmp\hudi-debug" \ --conf spark.core.connection.ack.wait.timeout=300 \ --conf spark.locality.wait=100 \ --conf spark.streaming.backpressure.enabled=true \ --conf spark.streaming.receiver.maxRate=500 \ --conf spark.streaming.kafka.maxRatePerPartition=200 \ --conf spark.ui.retainedJobs=10 \ --conf spark.ui.retainedStages=10 \ --conf spark.ui.retainedTasks=10 \ --conf spark.worker.ui.retainedExecutors=10 \ --conf spark.worker.ui.retainedDrivers=10 \ --conf spark.sql.ui.retainedExecutions=10 \ --conf spark.yarn.submit.waitAppCompletion=false \ --conf spark.yarn.maxAppAttempts=4 \ --conf spark.yarn.am.attemptFailuresValidityInterval=1h \ --conf spark.yarn.max.executor.failures=20 \ --conf spark.yarn.executor.failuresValidityInterval=1h \ --conf spark.task.maxFailures=8 \ / data/opt/spark - applications/hudi_canal_consumer/hudi - canal - import - 1.0 - the SNAPSHOT - jar - with - dependencies. Jar, kafka - server Local: 9092 -- kafka - topic dt_streaming_canal_xxx - base - save - path HDFS: / / 192.168.2.1:8020 / hudi_table / --mapping-mysql-db-name crm --mapping-mysql-table-name order --primary-key id --partition-key createDate --duration-seconds 1200Copy the code
Synchronize historical data and table metadata to Hive
The history_import_AND_meta_SYNC branch provides operations to synchronize historical data to HUDI tables and hudI table data structures to Hive Meta
Synchronize historical data to HUDI tables
The idea here is
- Import all mysql data to the Hive table by injecting tools such as SQoop.
- The data is then imported into the Hudi table using the tool HiveImport2HudiConfig in the branching code
HiveImport2HudiConfig provides the following parameters to configure the program execution behavior
Parameter names | meaning | If required | The default value |
---|---|---|---|
--base-save-path |
Hudi table stored in the basis of HDFS path, such as HDFS: / / 192.168.16.181:8020 / hudi_data / | is | There is no |
--mapping-mysql-db-name |
Specifies the name of the Mysql library to process | is | There is no |
--mapping-mysql-table-name |
Specifies the name of the Mysql table to process | is | There is no |
--store-table-name |
Specifies the table name for Hudi | no | By default, –mapping-mysql-db-name and –mapping-mysql-table-name are automatically generated. Suppose –mapping-mysql-db-name is CRM and –mapping-mysql-table-name is order. Then the final HUDI table is named crm__order |
--real-save-path |
Specifies the HDFS path where the HUDI table is ultimately stored | no | By default, it is automatically generated based on –base-save-path and –store-table-name. The generated format is ‘–base-save-path’+’/’+’–store-table-name’. The default value is recommended |
--primary-key |
Specify the field name that uniquely identifies the record in the synchronized Hive history table | no | The default id |
--partition-key |
Specify the time field that can be used for partitioning in the Hive history table. The field must be of timestamp or dateime type | is | There is no |
--precombine-key |
Ultimately used to configure HUDIhoodie.datasource.write.precombine.field |
no | The default id |
--sync-hive-db-name |
Name of the Hive library where all historical data resides | is | There is no |
--sync-hive-table-name |
Name of the hive table where all historical data resides | is | There is no |
--hive-base-path |
For details about the addresses for storing all hive data files, see hive configuration | no | /user/hive/warehouse |
--hive-site-path |
Hive-site. XML Address of the configuration file | is | There is no |
--tmp-data-path |
Path for storing temporary files during program execution. The default path is/TMP. If the disk where/TMP resides is too small, historical programs may fail to be executed. In this case, you can use this parameter to customize the execution path | no | Default operating system temporary directory |
A program executes the demo
Nohup java-jar hdi-learn-1.0-snapshot. jar --sync-hive-db-name hudi_temp --sync-hive-table-name crm__wx_user_info - base - save - path HDFS: / / 192.168.2.2:8020 / hudi_table / - mapping - mysql db - name CRM - mapping - mysql - table - the name "order" --primary-key "id" --partition-key created_date --hive-site-path /etc/lib/hive/conf/hive-site.xml --tmp-data-path /data/tmp > order.log &Copy the code
Synchronize HUDI table structures to Hive Meta
Hudi data structures and partitions need to be synchronized to Hive Meta in the form of Hive appearance so that Hive can sense HUDI data and query and analyze hudI data using SQL. Hudi can synchronize table metadata information to Hive when consuming binlogs for storage. However, Hive Meta needs to be read and written for every piece of data written to the Apache Hudi table, which may affect Hive performance significantly. So I developed a separate HiveMetaSyncConfig tool to synchronize HUDI table metadata to Hive. Considering that the program currently only supports daily partitioning, the synchronization tool can be executed once a day. Set the parameters as follows:
Parameter names | meaning | If required | The default value |
---|---|---|---|
--hive-db-name |
Specifies which Hive database to synchronize hudI tables to | is | There is no |
--hive-table-name |
Specify the hive table to which the HUDI table is synchronized | is | There is no |
--hive-jdbc-url |
Specify the hive meta JDBC links, such as JDBC: hive2: / / 192.168.16.181:10000 | is | There is no |
--hive-user-name |
Specify the link user name of the Hive meta | no | The default hive |
--hive-pwd |
Specify the link password of the Hive meta | no | The default hive |
--hudi-table-path |
Specify the HDFS file path where the HUDI table resides | is | There is no |
--hive-site-path |
Specify the hive-site. XML path for hive | is | There is no |
A program executes the demo
Java-jar hdi-learning-1.0-snapshot. jar --hive-db-name streaming --hive-table-name crm__order --hive-user-name hive - the hive - PWD hive - hive JDBC - the JDBC url: hive2: / / 192.168.16.181:10000 - hudi - table - path HDFS: / / 192.168.16.181:8020 / hudi_table/crm__order - hive - site - path/lib/hive/conf/hive - site. XMLCopy the code
Some hit the pit
Hive Configuration
Some of the hive cluster hive. Input. The format configuration, the default is org.apache.hadoop.hive.ql.io.Com bineHiveInputFormat, This causes the Hive facade to mount Hudi data to read all of Hudi’s Parquet data, resulting in duplicate final read results. Need to hive format instead of org. Apache. Hadoop. Hive. Ql. IO. HiveInputFormat, in order to avoid the whole cluster level changes impact on the rest of the off-line hive Sql unnecessary, Recommended only for current hive session set set hive. Input. The format = org.. Apache hadoop. Hive. Ql. IO. HiveInputFormat;
Some tuning of Spark Streaming
Since binlog writes Hudi tables based on Spark Streaming implementation, here are some configurations at Spark and Spark Streaming level, which can make the whole program work more stable
configuration | meaning |
---|---|
spark.streaming.backpressure.enabled=true | This configuration enables Spark Streaming consumption rate to be adjusted based on the last consumption to avoid program crashes |
spark.ui.retainedJobs=10 spark.ui.retainedStages=10 spark.ui.retainedTasks=10 spark.worker.ui.retainedExecutors=10 spark.worker.ui.retainedDrivers=10 spark.sql.ui.retainedExecutions=10 |
By default, Spark stores historical information about stages and tasks during the execution of Spark programs in the driver. If the memory of the driver is too small, the driver may crash. Set the preceding parameters to adjust the number of historical data stores to reduce the use of the inner layer |
spark.yarn.maxAppAttempts=4 | Set the number of times that the driver tries to restart after it crashes |
spark.yarn.am.attemptFailuresValidityInterval=1h | If the driver crashes only once a week, we would prefer to be restarted every time. This configuration is used to reset the maxAppAttempts interval before the driver is restarted after four total restarts |
spark.yarn.max.executor.failures=20 | Executor execution may fail. After the failure, the cluster automatically allocates a new executor. This parameter is used to set the maximum number of executor failures allowed. Max number of Executor failures (400) reached) and exits |
spark.yarn.executor.failuresValidityInterval=1h | Specifies the interval for resetting the number of executor failed reassignments |
spark.task.maxFailures=8 | Number of allowed task failures |
Future improvement
- Non-partitioned, or non-date partitioned tables are supported. Currently only date partitioned tables are supported
- Multi-data type support. Currently, for the stability of the program, all fields in Mysql will be stored as String type to Hudi
The resources
hudi.apache.org/
Welcome to follow my personal account “North by Northwest UP”, recording code life, industry thinking, technology comments