Flink has been offering Hive integration since 1.9.0, and Flink 1.11 has taken its integration with Hive a step further, as it attempts to integrate streaming computing scenarios with Hive.
This article will share the new features of Hive in Flink 1.11, and how to use Flink to transform Hive data warehouse in real time, so as to achieve the goal of batch integration. The main contents include:
· Background introduction of Flink and Hive integration
· New features in Flink 1.11
· Build Hive batch data warehouse
I. Flink and Hive integration background
Why do Flink and Hive integration features? The original intention was that we wanted to tap into Flink’s batch capabilities. It is well known that Flink is already a successful engine for streaming computing and is used by a large number of users. Batch computing is a special case of stream processing in Flink’s design philosophy. This means that if Flink is good at streaming, its architecture can support batch scenarios as well. SQL is an important entry point in batch computing scenarios. Because the students who do data analysis are more accustomed to using SQL for development, rather than writing programs such as DataStream or DataSet.
Hive, the SQL engine of the Hadoop ecosystem, is a de facto standard. Most user environments use Hive to build data warehouses. Some newer SQL engines, such as Spark SQL and Impala, provide the ability to integrate with Hive. In order to conveniently connect to the existing usage scenarios of current users, we believe that connecting to Hive is also an indispensable function for Flink.
So in Flink 1.9, we started to provide integration with Hive. In 1.9, of course, this feature was released as a trial release. With Flink version 1.10, the Hive integration is production-ready. At the same time, when Flink 1.10 was released, we used 10TB TPC-DS test set to compare Flink and Hive on MapReduce, and the comparison results are as follows:
The blue box indicates the time spent on Flink, and the orange box indicates the time spent on Hive on MapReduce. The end result is that Flink is about seven times better than Hive on MapReduce. So it verifies that Flink SQL can well support batch computing scenarios. The following describes the design architecture of Flink connecting to Hive. The following layers are required for connecting to Hive:
· Access Hive metadata; · Read and write Hive table data; , Production Ready;
1. Access the Hive metadata
Hive metadata is managed by Hive Metastore. This means that Flink needs to communicate with Hive Metastore. For better access to Hive metadata, Flink has proposed a completely new Catalog API.
This new interface is a generic design. It’s not just about connecting to Hive metadata, but it can theoretically connect to metadata from different external systems.
And in a Flink Session, it is possible to create multiple catalogs, each corresponding to an external system. Users can specify which Catalog to define in the Flink Table API or, if using SQL Client, in the Yaml file. The Catalog is then loaded when the SQL Client creates the TableEnvironment. TableEnvironment manages instances of these different catalogs through CatalogManager. In this way, the SQL Client can use the Catalog to access the metadata of the external system during the subsequent SQL statement submission.
The diagram above lists two implementations of Catalog. One is the GenericlnMemoryCatalog, which stores all metadata in memory on the Flink Client. Its behavior is similar to that of Flink before the Catalog interface. The lifetime of all metadata is the same as that of the SQL Client Session. When the Session ends, the metadata created in the Session is automatically lost.
The other is to interconnect with Hive by focusing on HiveCatalog. HiveCatalog connects to Hive Metastore instances and communicates with Hive Metastore to read and write metadata. To support multiple Hive versions, the Metastore apis of different Hive versions may be incompatible. HiveShim was added between HiveCatalog and Hive Metastore to support different versions of Hive.
HiveCatalog allows Flink to access Hive’s own metadata, and it also provides Flink with the ability to persist metadata. HiveCatalog can be used to store both Hive metadata and Flink metadata. For example, if you create a Kafka table in Flink, that table can also be stored in HiveCatalog. This gives Flink the ability to persist metadata. Before HiveCatalog, there was no persistence.
2. Read and write Hive table data
With the ability to access Hive metadata, another important aspect is reading and writing Hive table data. Hive tables are stored in Hadoop’s file system, which is a HDFS or other file system. If Hadoop implements the File System interface, it can theoretically store Hive tables.
In Flink:
· HiveTableSource For data reading · HiveTableSink for data writing
A principle of design is: hope to reuse Hive original Input/Output Format, SerDe, etc., to read and write Hive data. There are two main advantages of this approach. One is that reuse can reduce the workload of development. Another reuse benefit is to ensure compatibility with Hive writes as much as possible. The target is data written by Flink that Hive must be able to read properly. Conversely, Flink can also read data written by Hive.
3. Production Ready
Production Ready for interworking with Hive was implemented in Flink 1.10. Implementing Production Ready is primarily about assuming that it is functionally complete. The specific functions are as follows:
New features in Flink 1.11
Here’s a look at some of the new Hive connectivity features available in Flink 1.11.
1. Simplified dependency management
The first step is to simplify dependency management using the Hive Connector. A pain point of the Hive connector is the need to add several JAR dependencies, and the required JAR dependencies vary according to the Hive version used. For example:
The first diagram shows the JAR packages you need to add for the Hive 1.0.0 version you are using. The second figure shows the JAR packages you need to add with Hive 2.2.0. Note The number and version of JAR packages added varies with Hive versions. So if you don’t read the documentation carefully, it can easily lead to user-added dependency errors. When errors are added, such as missing additions or incorrect versions, some strange and difficult to understand errors are reported. This is one of the most common problems with Hive Connectors.
So we want to simplify dependency management and provide a better user experience. Starting with Flink 1.11, there are some pre-loaded Hive dependencies:
You can select a dependency package based on your Hive version.
If you’re not using the open source version of Hive, you can still use the 1.10 approach to add individual JARS yourself.
2. Hive Dialect enhancements
Hive Dialect was introduced in Flink 1.10, but few people use it because of the weak Hive Dialect feature. The only feature is whether to allow the creation of partitioned tables. If Hive Dialect is configured, you can create partitioned tables in Flink SQL. If this parameter is not set, it cannot be created.
Another key is that it does not provide compatibility with Hive syntax. You can create partitioned tables if you have a Hive Dialect configured, but creating DDL for partitioned tables is not Hive syntax.
The Hive Dialect feature has been enhanced in Flink 1.11. The goal of the enhancement is to provide users with a similar experience to using Hive CLI or Beeline when using Flink SQL Client. Using Flink SQL Client, you can write some Hive specific syntax. In other words, Hive scripts can be completely unmodified when users migrate to Flink.
In order to achieve this goal, the following improvements were made in Flink 1.11:
**·** the Dialect has been parameterized to support default and hive. Default is the Dialect of Flink and hive is the Dialect of Hive. · SQL Client and API can be used. · Dynamic switching can be flexibly done at the statement level. For example, after a Session is created, if you want to write the first Dialect of Flink, set it to default. After executing a few statements, you can set the Dialect to Hive if you want to write in the Hive Dialect. When switching, there is no need to restart the Session. · Compatible with Hive common DDL and basic DML. · Provide similar experience to Hive CLI or Beeline.
3. Enable the Hive Dialect
Create Hive Dialect in SQL Client. You can set the initial Dialect in the SQL Client. It can be set in a Yaml file, or it can be dynamically switched after the SQL Client is up.
You can also use the Flink Table API to enable Hive Dialect:
As you can see, get the Config from TableEnvironment and set it on.
4. Syntax supported by the Hive Dialect
The Hive Dialect syntax is primarily enhanced with DDL. Since writing DDL to manipulate Hive metadata through Flink SQL is not very available in 1.10, the focus is on DDL to address this pain point.
Currently, the following DDLS are supported:
5. Write streaming data to Hive
In Flink 1.11 also made streaming data scenarios, as well as with Hive combined with the function, through the combination of Flink and Hive, to help Hive data warehouse for real-time transformation.
Streaming data writing Hive is implemented by Streaming File Sink, which is fully SQL and does not require user code development. Hive also supports streaming data writing to partitioned and non-partitioned tables. Hive data warehouses are generally offline data, and users have high requirements on data consistency. Therefore, the exact-once semantic is supported. Streaming data writing to Hive takes about 5-10 minutes. If you want latency to be as low as possible, one result will be more small files. Small files are unfriendly to HDFS. If a large number of small files are generated, HDFS performance may be affected. In this case, you can do some small text merge operations.
Streaming data writing to Hive requires several configurations:
For partitioned tables, set the Partition Commit Delay parameter. The value of this parameter is to control how long each partition contains data, such as day, hour, etc.
Partition Commit Trigger Specifies when a Partition Commit is triggered. Process-time and partition-time triggers are supported in 1.11.
Partition Commit Policy Indicates how to Commit the Partition. In Hive, partitions need to be committed to MetaStore so that they are visible. Metastore policy supports only Hive tables. Success-file is used to tell downstream jobs that data for the partition is ready. Users can also customize their own submission methods. In addition, you can specify multiple policies. For example, you can specify both metastore and success-file.
The implementation principle of streaming data writing to Hive is as follows:
There are two main parts, one is StreamingFileWriter, which implements data writing. It distinguishes buckets. Bucks here are similar to Hive partitioning concept, and each Subtask will write data to different buckets. The Bucket written by each Subtask may maintain three types of Files at the same time. In-progress Files indicate the file being written, Pending Files indicate that the file has been written but has not been submitted. Finished Files Indicates that Files are written and submitted.
The other is StreamingFileCommitter, which is executed after StreamingFileWriter. It is used to commit partitions, so it is not needed for non-partitioned tables. When one of the StreamingFileWriter partitions is ready, the StreamingFileWriter issues a Commit Message to the StreamingFileCommitter, The Commit Message tells the StreamingFileCommitter that the data is ready. Commit Trigger and Commit Policy.
Here’s a concrete example:
The example creates a partitioned table called hive_table with two partitions DT and HOUR. Dt is a string of dates and hour is a string of hours. Commit trigger is set to partition time, Commit delay is set to 1 hour, and Commit Policy is set to metastore and success-file.
6. Streaming consumption Hive
In Flink 1.10, Hive data is read in batch mode. Since 1.11, Hive data is read in streaming mode.
Continuously monitor Hive tables for new data and consume incremental data if it exists.
To enable streaming consumption for a Hive table, you can enable streaming consumption for a table property. Alternatively, you can use the dynamic Options function added in 1.11 to dynamically specify whether to enable streaming reading for Hive tables.
Hive supports partitioned and non-partitioned tables. For non-partitioned tables, new files are added to the table directory and read incrementally. For partition table, check whether new partition is added by monitoring partition directory and Metastore. If new partition is added, the data of the new partition will be read out. It is important to note that reading new partition data is one-time. That is, after a new partition is added, the data in the partition is read out all at once. After that, the data in the partition is no longer monitored. So if you need to use Flink streaming to consume Hive partition tables, you should ensure that the partition’s data is complete when it is added.
Streaming consumption of Hive data also requires additional parameters. You need to specify the consumption order first. Since data is read incrementally, you need to specify the consumption order. Currently, create-time and partition-time are supported.
The user can also specify the starting point of consumption, similar to the way consumption kafka specifies offset, at which point in time the data is expected to be consumed. When Flink consumes data, it checks and only reads data after that point in time.
Finally, you can specify monitoring intervals. Since new data is currently being added by scanning the file system, you may want to monitor the file system not too often, because too often will cause a lot of stress on the file system. So you can control an interval.
Finally, the principle of streaming consumption. Let’s look at the streaming consumption non-partitioned table:
Diagram ContinuoousFileMonitoringFunction will continue to monitor the partition table directory the file below, will continue to interact with the file system. Once found to have new files added, it will be for these files are generated Splits, and Splits into ContinuoousFileReaderOperator, The FileReaderOperator will then go to the Splits file system and actually consume the data, and then transfer the read data downstream for processing.
For current consumption partition table and partition table difference is not very big, including HiveContinuousMonitoringFunction will also go to constantly scans the file system, but it scans the new directory partition. When it finds a new partition directory, it further checks metStore to see if the partition has been committed to metStore. If committed, the data in the partition can be consumed. Then pass the data in the partition generation Splits to ContinuousFileReaderOperator, then can the data consumption.
7. Associate the Hive dimension table
Another scenario for combining Hive with streaming data is associated with Hive dimension tables. For example, when consuming streaming data, join an offline Hive dimension table.
The associated Hive dimension Table adopts the syntax of Flink’s Temporal Table, that is, the Hive dimension Table is used as Temporal Table and then joins the streaming Table. For more information about Temporal Table, please check Flink’s official website.
The implementation of associated Hive dimension tables is that each sub-task caches the Hive table in memory, which caches the entire Hive table. If the size of the Hive dimension table exceeds the available memory of the sub-task, the task will fail.
The Hive dimension table may be updated during the Hive dimension table association. Therefore, users are allowed to set the timeout period for the Hive table cache. When the time expires, the sub-task reloads the Hive dimension table. Note That this scenario does not apply to the situation where the Hive dimension table is frequently updated, which causes heavy pressure on the HDFS. Therefore, this method applies to the situation where the Hive dimension table is slowly updated. The cache timeout is usually set to a long time, usually in the hour level.
This figure shows the principle of associated Hive dimension tables. Streaming Data represents Streaming Data, LookupJoinRunner represents Join operator, it will get the Join key of Streaming Data, and pass the Join key to FileSystemLookupFunction.
FileSystemLookupFunction is a Table function that interacts with the underlying file system and loads the Hive Table. Then query the join key in the Hive Table to determine which rows can be joined.
The following is an example of associated Hive dimension tables:
This is an example from Flink’s official website, where the streaming table is Orders and LatestTates is Hive’s dimension table.
Hive batch flow one number warehouse
As you can see from the above, in Flink 1.11, the integration of Hive data warehouse and batch flow was heavily developed. Because Flink is a stream processing engine, I hope to help users better combination of batch and stream, so that Hive data warehouse to achieve real-time transformation, so that users more convenient mining data value.
Prior to Flink 1.11, Flink did batch calculations with Hive and only supported offline scenarios. One problem with offline scenarios is that the latency is large. Batch jobs are usually scheduled through some scheduling frameworks. So delays actually add up. For example, run the first job before running the second job… Do this in turn. So the end-to-end delay is the sum of all the jobs.
With Hive streaming support in 1.11, Hive data warehouses can be reconfigured in real time. For example, some data Online can be written to Hive in real time using Flink as ETL. After data is written to Hive, a new Flink job can be added to perform real-time or near-real-time queries and return results quickly. At the same time, other Flink jobs can also use the data written into the Hive data warehouse as a dimension table to associate and integrate other online data to obtain analysis results.
About the author:
Rui Li, alias “Tianli” of Alibaba, is a technical expert of Alibaba. He is a member of Apache Hive PMC. Before joining Alibaba, he worked for Intel, IBM and other companies, mainly involved in Hive, HDFS, Spark and other open source projects.