Apache Flink provides Hive integration since version 1.9.0. Users can use Flink to access Hive metadata and read and write tables in Hive. This article will mainly introduce this function from the aspects of the design architecture of the project, the latest progress, instructions and so on.
Flink on Hive Introduction
SQL is an important application scenario in the field of big data. In order to improve the Flink ecosystem and explore the potential of Flink in batch processing, we decided to enhance the function of FlinkSQL so that users can complete more tasks through Flink.
Hive is the earliest SQL engine in the field of big data, with rich functions and a wide user base. Later SQL engines, such as Spark SQL and Impala, integrate with Hive to make it easier for users to use existing data warehouses and migrate jobs. So we think it’s important for FlinkSQL to provide the ability to interact with Hive as well.
Design architecture
Integration with Hive mainly involves access to metadata and actual table data, so we’ll describe the architecture of the project in those terms.
1. The metadata
To access metadata from external systems, Flink provides the concept of ExternalCatalog. But at present, the definition of ExternalCatalog is very incomplete and basically unavailable. Therefore, we propose a new set of Catalog interfaces to replace the existing ExternalCatalog. The new Catalog can support multiple metadata objects such as databases, tables, and partitions. Allow multiple Catalog instances to be maintained in a user Session, allowing simultaneous access to multiple external systems; The Catalog is pluggable into Flink, allowing users to provide custom implementations. The following figure shows the overall architecture of the new Catalog API.
When you create a TableEnvironment, a CatalogManager is created to manage the different Catalog instances. TableEnvironment provides metadata services for Table API and SQL Client users through Catalog.
Currently there are two implementations of Catalog, GenericInMemoryCatalog and HiveCatalog. GenericInMemoryCatalog maintains the original Flink metadata management mechanism and stores all metadata in memory. HiveCatalog connects to an instance of Hive Metastore to provide metadata persistence. To use Flink to interact with Hive, you need to configure a HiveCatalog and use HiveCatalog to access metadata in Hive. HiveCatalog, on the other hand, can also be used to process Flink’s own metadata. In this scenario, HiveCatalog only uses Hive Metastore as persistent storage. Metadata written to Hive Metastore may not be in the format supported by Hive. A Single HiveCatalog instance can support both modes, eliminating the need to create separate instances for managing Hive and Flink metadata.
In addition, we designed HiveShim to support different versions of Hive Metastore. Hive versions 2.3.4 and 1.2.1 are supported.
2. The table data
We provide Hive Data Connector to read and write Hive table Data. Hive Data Connector uses Hive Input/Output Format and SerDe classes as much as possible. The benefits of this are on the one hand to reduce code duplication, and more importantly, to maintain maximum compatibility with Hive. That is, data written by Flink can be read by Hive and vice versa.
Similar to HiveCatalog, Hive Data Connector currently supports Hive versions 2.3.4 and 1.2.1.
Project progress
Flink integration with Hive will be released as a trial feature in version 1.9.0. Users can interact with Hive using the Table API or SQL Client mode. Listed below are the features already supported in 1.9.0:
- Provide simple DDL to read Hive metadata, such as show Databases, show Tables, describe Table, and so on.
- You can use the Catalog API to modify Hive metadata, such as create Table and Drop Table.
- Read Hive data from partitioned and non-partitioned tables.
- Write Hive data to non-partitioned tables.
- Supports file formats such as Text, ORC, Parquet, and SequenceFile.
- You can invoke udFs created in Hive.
Since it’s a trial feature, there are still some areas that are not perfect. Here are some of the features that are missing in 1.9.0:
- INSERT OVERWRITE is not supported.
- Writing partition tables is not supported.
- ACID table is not supported.
- Bucket tables are not supported.
- Views are not supported.
Some data types are not supported, including Decimal, Char, Varchar, Date, Time, Timestamp, Interval, and Union.
How to apply
1. Add dependencies
To use Flink’s integration with Hive, users need to add corresponding dependencies first. If you are using SQL Client, you need to add the dependent JAR to Flink’s lib directory; If you use the Table API, you need to add the corresponding dependencies to your project (such as pom.xml).
As mentioned above, Hive versions 2.3.4 and 1.2.1 are currently supported. The following table lists the required dependencies for different Hive versions.
Flink-shaded hadoop-2-uber includes Hive’s dependence on Hadoop. If you do not use the Package provided by Flink, you can add the Hadoop package used in the cluster. Ensure that the Hadoop version you add is compatible with the Hive version (Hive 2.3.4 relies on Hadoop 2.7.2; Hive 1.2.1 relies on Hadoop version 2.6.0).
Dependent Hive packages (hive-exec and hive-metastore) can also use jar packages provided by Hive in user clusters. For details, see Supported Hive versions.
2. Configuration HiveCatalog
To interact with Hive, you must use HiveCatalog. The following describes how to configure HiveCatalog.
3.SQL Client
When using SQL Client, users need to specify their desired Catalog in SQL-client-defaults. yaml. You can specify one or more instances of Catalog in the Catalog s list of SQL-client-defaults. yaml. The following example shows how to specify a HiveCatalog:
catalogs:
# A typical catalog definition looks like:
- name: myhive
type: hive
hive-conf-dir: /path/to/hive_conf_dir
hive-version: 2.3.4Copy the code
Where name is the name specified by the user for each Catalog instance. The Catalog name and DB name form the metadata namespace in FlinkSQL, so each Catalog name needs to be unique. Type indicates the Catalog type. For HiveCatalog, type should be hive. Hive-conf-dir Is used to read hive configuration files. You can set it as the hive configuration file directory in the cluster. Hive-version Specifies the Hive version. The value can be 2.3.4 or 1.2.1.
Once HiveCatalog is specified, the user can start sqL-client and verify that HiveCatalog has been loaded correctly by running the following command.
Flink SQL> show catalogs;
default_catalog
myhive
Flink SQL> use catalog myhive;Copy the code
Show catalogs lists all loaded catalogs instances. Note that in addition to the Catalog configured by the user in the SQL-client-defaults. yaml file, FlinkSQL also automatically loads an instance of GenericInMemoryCatalog as the built-in Catalog, The default name of this built-in Catalog is default_CATALOG.
Use catalog to set the user Session’s current catalog. When a user accesses metadata objects (such as DB and Table) in an SQL statement, if the Catalog name is not specified, FlinkSQL searches for metadata objects in the current Catalog.
4.Table API
The following code shows how to create HiveCatalog using the TableAPI and register it with the TableEnvironment.
String name = "myhive";
String defaultDatabase = "default";
String hiveConfDir = "/path/to/hive_conf_dir";
String version = "2.3.4"; TableEnvironment tableEnv =... ; // create TableEnvironment HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir, version); tableEnv.registerCatalog(name, hiveCatalog); tableEnv.useCatalog(name);Copy the code
After registering HiveCatalog with the TableEnvironment, you can access the metadata in HiveCatalog when submitting SQL through the TableEnvironment. Similar to SQL Client, TableEnvironment provides a useCatalog interface that lets users set the current Catalog.
5. Read and write the Hive table
After setting HiveCatalog, you can use SQL Client or Table API to read and write tables in Hive.
6.SQL Client
If you already have a table named SRC in Hive, you can use the following SQL statement to read and write the table.
Flink SQL> describe src; root |-- key: STRING |-- value: STRING Flink SQL> select * from src; Key value 100 VAL_100 298 VAL_298 9 VAL_9 341 VAL_341 498 VAL_498 146 VAL_146 458 VAL_458 362 VAL_362 186 VAL_186...... ... Flink SQL> insert into src values ('newKey'.'newVal');Copy the code
7.Table API
Similarly, the Table mentioned above can be read and written using the Table API. The following code shows how to do this.
TableEnvironment tableEnv =... ; // create TableEnvironment tableEnv.registerCatalog("myhive", hiveCatalog);
// set myhive as current catalog
tableEnv.useCatalog("myhive");
Table src = tableEnv.sqlQuery("select * from src");
// write src into a sink or doFurther analysis... tableEnv.sqlUpdate("insert into src values ('newKey', 'newVal')");
tableEnv.execute("insert into src");Copy the code
8. Support different Hive versions
Hive versions 2.3.4 and 1.2.1 are supported in Flink 1.9.0, and we have only tested them so far. When using SQL Client, if the user does not specify the Hive version in sqL-client-defaults. yaml, we automatically detect the Hive version in the classpath. If the detected Hive version is not 2.3.4 or 1.2.1, an error message is displayed.
With Hive compatibility guarantees, other smaller versions are more likely to work as well. Therefore, if a user is using a smaller version of Hive than we support, they can specify a supported version to try out the features of integration with Hive. For example, if Hive version 2.3.3 is used, you can specify Hive version 2.3.4 in sqL-client-defaults. yaml file or code.
9. Choice of execution mode versus Planner
In Flink 1.9.0, TableSink of Hive works only in Batch mode. If you want to use TableSink of Hive, set the execution mode to Batch.
Flink 1.9.0 adds a new Blink Planner, which is more comprehensive than the original planner, so we recommend using blink Planner when integrating with Hive using FlinkSQL. It is also possible that new features will only support Blink Planner.
With SQL Client you can specify the execution mode and planner in SQL-client-defaults. yaml like this:
execution:
# select the implementation responsible for planning table programs
# possible values are 'old' (used by default) or 'blink'
planner: blink
# 'batch' or 'streaming' execution
type: batchCopy the code
The corresponding Table API is written as follows:
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);Copy the code
In the late planning
We will further improve the integration with Hive in future releases of Flink, with production-ready expected in version 1.10.0. What we plan to do in future releases includes:
- More complete data type support
- Supports writing partitioned tables, including static and dynamic partitions
- Support the INSERT OVERWRITE
- Support the View
- More complete DDL, DML support
- TableSink supports Hive. TableSink works in streaming mode for users to write streaming data to Hive
- Test and support more Hive versions
- Support the Bucket list
- Performance testing and optimization
We welcome you to try out Hive in Flink 1.9 and feel free to contact us if you have any questions.
The author: Ba Shu Zhen
The original link
This article is the original content of the cloud habitat community, shall not be reproduced without permission.