Author: Jason

When will Apache Spark support Hive integration? I’m sure anyone who has used Spark will say it was a long time ago.

When will Apache Flink support integration with Hive? This may be confusing to readers, but you haven’t supported it yet, have you? Or it’s only supported in recent versions, but it’s still weak.

There’s no point in comparing, different communities have different goals, and Flink has put a lot of effort into real real-time streaming computing. Apache Hive has become the focal point of the data warehouse ecosystem. It is not only an SQL engine for big data analysis and ETL, but also a data management platform, so whether it is Spark, Flink, Impala, Presto, etc., Will actively support integration with Hive.

Indeed, for those of you who really need to use Flink to access Hive for data reading and writing, Apache Flink has only just started offering Hive integration in version 1.9.0. However, the good news is that the Flink community has done a lot to integrate Hive features, and things are going well. Flink 1.10.0 RC1 has recently been released so that interested readers can research and verify the features.

Architecture design

First of all, the author based on the community public information and blog, a general explanation of Flink integrated Hive architecture design.

The purpose of Apache Flink integration with Hive mainly includes metadata and actual table data access.

metadata

Flink initially introduced the concept of ExternalCatalog to access metadata from external systems. However, the definition of ExternalCatalog is very incomplete and basically unavailable. Flink 1.10 officially removes the ExternalCatalog API (Flink-13697), which includes:

  • ExternalCatalog (and all dependent classes, such as ExternalTable)
  • SchematicDescriptor, MetadataDescriptor, and StatisticsDescriptor

In response to the ExternalCatalog issue, the Flink community proposed a new Catalog API to replace the existing ExternalCatalog. The new Catalog implements the following features:

  • Supports metadata objects such as databases, tables, and partitions
  • Multiple Catalog instances can be maintained within a user Session, enabling simultaneous access to multiple external systems
  • Catalog plugs into Flink in a pluggable way, allowing users to provide custom implementations

The following diagram shows the overall architecture of the new Catalog API:

When you create a TableEnvironment, a CatalogManager is created to manage the different Catalog instances. TableEnvironment provides metadata services for Table API and SQL Client users through Catalog.

val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val tableEnv = TableEnvironment.create(settings)

val name            = "myhive"
val defaultDatabase = "mydatabase"
val hiveConfDir     = "/opt/hive-conf"// a local path
val version         = "2.3.4"

val hive = newHiveCatalog(name, defaultDatabase, hiveConfDir, version)
tableEnv.registerCatalog("myhive", hive)

// set the HiveCatalog as the current catalog of the session
tableEnv.useCatalog("myhive")
Copy the code

Currently there are two implementations of Catalog, GenericInMemoryCatalog and HiveCatalog. GenericInMemoryCatalog maintains the original Flink metadata management mechanism and stores all metadata in memory. HiveCatalog connects to an instance of Hive Metastore to provide metadata persistence. To use Flink to interact with Hive, you need to configure a HiveCatalog and use HiveCatalog to access metadata in Hive.

HiveCatalog, on the other hand, can also be used to process Flink’s own metadata. In this scenario, HiveCatalog only uses Hive Metastore as persistent storage. Metadata written to Hive Metastore may not be in the format supported by Hive. A Single HiveCatalog instance can support both modes, eliminating the need to create separate instances for managing Hive and Flink metadata.

In addition, HiveShim is designed to support different Hive versions of Metastore. For details about supported Hive versions, see the official documents.

Table data

Flink provides Hive Data Connector to read and write Hive table Data. Hive Data Connector uses Hive Input/Output Format and SerDe classes as much as possible. The benefits of this are on the one hand to reduce code duplication, and more importantly, to maintain maximum compatibility with Hive. That is, data written by Flink can be read by Hive and vice versa.

Integrating the Hive Function

Flink integration with Hive was released as a trial feature in version 1.9.0 and has some limitations, but the upcoming stable version of Flink 1.10 will improve the integration of Hive and apply it to enterprise scenarios.

To give readers a preview of Flink 1.10’s Hive integration, WE compiled and tested Flink 1.10.0 RC1 based on Cloudera CDH.

Environmental information

CDH version: CDH5.16.2 Flink version: release-1.10.0-RC1

Flink uses the RC version, which is only for testing and is not recommended for production. The Cloudera Data Platform now officially integrates Flink as its stream computing product, making it easy for users to use.

The CDH environment has Sentry and Kerberos enabled.

Download and compile Flink

$wget https://github.com/apache/flink/archive/release-1.10.0-rc1.tar.gz $tar ZXVF release - 1.10.0 - rc1. Tar. Gz $cdFlink-release-1.10.0-rc1 / $MVN clean install -dskiptests-pvendor-repos -dhadoop. version=2.6.0-cdh5.16.2Copy the code

/ / flink-hadoop-fs/flink-hadoop-fs / / flink-hadoop-fs / /

[ERROR] Failed to execute goal on project flink-hadoop-fs: Could not resolve dependencies for project org.apache.flink:flink-hadoop-fs:jar:1.10.0: Failed to collect dependencies at org.apache.flink: shaded-hadoop-2: JAR: 2.1.0-CDH5.16.2-9.0: Failed to read artifact Descriptor for org.apache.flink:flink-shaded hadoop-2: JAR: 2.1.0-CDH5.16.2-9.0: Could not transfer artifact org.apache.flink:flink-shaded-hadoop-2: POM :2.6.0- cDH5.16.2-9.0 from/to HDPReleases (repo.hortonworks.com/content/rep…). : Remote host closed connection during handshake: SSL peer shut down incorrectly

However, the problem that FLink-shaded -hadoop-2 could not be found in Maven is that the jar package of CDH flink-shaded-hadoop-2 does not have a corresponding compilation version in Maven’s central repository. So we need to package Flink-shaded -hadoop-2, which Flink relies on, and compile it.

■ Solve the Flink-shaded -hadoop-2 problem

  • Get the flink-Shaded source code
git clone https://github.com/apache/flink-shaded.git
Copy the code
  • Switch dependent version branches

Flink-avenin-hadoop-2 is missing in 9.0, which is not proposed in this code.

Git checkout in release 9.0Copy the code
  • Configure the CDH Repo repository

Modify pom.xml in the Flink-shaded project to add the CDH Maven repository so that cdH-related packages are not found at compile time.

In the… Add the following content to:

<profile> <id>vendor-repos</id> <activation> <property> <name>vendor-repos</name> </property> </activation> <! -- Add vendor maven repositories --> <repositories> <! -- Cloudera --> <repository> <id>cloudera-releases</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos</url> <releases> <enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
</profile>
Copy the code
  • Compile the flink – shaded

Start compiling:

mvn clean install -DskipTests-Drat.skip=truePvendor - repos - Dhadoop. Version = server - cdh5.16.2Copy the code

It is recommended to compile in a scientific way on the Internet. If readers encounter some network connection problems, they can try to re-try or change the repository address of the dependent components.

Flink-shaded – hadoop-2-uber-2.6.0-cDH5.16.2-9.0.jar is installed in the maven repository as follows:

Installing /Users/… / source/flink – shaded/flink – shaded – hadoop – 2 – uber/target/flink – shaded – hadoop – 2 – uber – server – cdh5.16.2-9.0. The jar to/Users /… / m2 / repository/org/apache/flink/flink – shaded – hadoop – 2 – uber/server – cdh5.16.2-9.0 / flink – shaded – hadoop – 2 – uber – server – cdh5.16 , 2-9.0 – jar Installing/Users /… /source/flink-shaded/flink-shaded-hadoop-2-uber/target/dependency-reduced-pom.xml to /Users/… / m2 / repository/org/apache/flink/flink – shaded – hadoop – 2 – uber/server – cdh5.16.2-9.0 / flink – shaded – hadoop – 2 – uber – server – cdh5.16 . 2-9.0. Pom

Recompile Flink

MVN clean install -dskiptests-pvendor-repos -dhadoop. version=2.6.0 -cDH5.16.2Copy the code

The long waiting process allows the reader to do other things in parallel.

During compilation, if nothing else, you will see an error message similar to the following:

[INFO] Running ‘npm ci –cache-max=0 –no-save’ in / Users/XXX/Downloads/Flink Flink – release – 1.10.0 – rc1 / Flink – release – 1.10.0 – rc1 / Flink – runtime – web/web – dashboard [WARNING] npm WARN prepare removing existing node_modules/ before installation [ERROR] WARN registry Unexpected warning for Registry.npmjs.org/: Miscellaneous Warning ECONNRESET: request to registry.npmjs.org/mime/-/mime… failed, reason: read ECONNRESET [ERROR] WARN registry Using stale package data from registry.npmjs.org/ due to a request error during revalidation. [ERROR] WARN registry Unexpected warning for registry.npmjs.org/: Miscellaneous Warning ECONNRESET: The request to registry.npmjs.org/optimist/-/… failed, reason: read ECONNRESET

As you can see, the flink-Run-time Web module introduces a dependency on frontend- Maven-plugin, requiring node, NPM and dependent components to be installed.

If you are not using science, you can modify the flink-runtime-web/pom. XML file to add nodeDownloadRoot and npmDownloadRoot information:

< plugin > < groupId > com. Making. Eirslett < / groupId > < artifactId > frontend - maven - plugin < / artifactId > < version > 1.6 < / version > <executions> <execution> <id>install node and npm</id> <goals> <goal>install-node-and-npm</goal> </goals> <configuration> <nodeDownloadRoot>https://registry.npm.taobao.org/dist/</nodeDownloadRoot> < npmDownloadRoot > https://registry.npmjs.org/npm/-/ < / npmDownloadRoot > < nodeVersion > v10.9.0 < / nodeVersion > < / configuration >  </execution> <execution> <id>npm install</id> <goals> <goal>npm</goal> </goals> <configuration> <arguments>ci --cache-max=0 --no-save</arguments> <environmentVariables> <HUSKY_SKIP_INSTALL>true</HUSKY_SKIP_INSTALL>
</environmentVariables>
</configuration>
</execution>
<execution>
<id>npm run build</id>
<goals>
<goal>npm</goal>
</goals>
<configuration>
<arguments>run build</arguments>
</configuration>
</execution>
</executions>
<configuration>
<workingDirectory>web-dashboard</workingDirectory>
</configuration>
</plugin>
Copy the code

After successful compilation, the Flink installation file is located in the flink-release-1.10.0-rc1/flink-dist/target/flink-1.10.0-bin directory, packaged and uploaded to the deployment node:

$ cdFlink-dist /target/flink-1.10.0-bin $tar ZCVF flink-1.10.0.tar.gz flink-1.10.0Copy the code

Deployment and Configuration

Flink is easy to deploy, just decompress the package. In addition, you can set soft links, environment variables, etc., I will not introduce.

The core configuration file of Flink is flink-conf.yaml. A typical configuration is as follows:

jobmanager.rpc.address: localhost
jobmanager.rpc.port: 6123
jobmanager.heap.size: 2048m
taskmanager.heap.size: 1024m
taskmanager.numberOfTaskSlots: 4
parallelism.default: 1
high-availability: zookeeper
high-availability.storageDir:hdfs:///user/flink110/recovery
high-availability.zookeeper.quorum: zk1:2181,zk2:2181,zk3:2181
state.backend: filesystem
state.checkpoints.dir: hdfs:///user/flink110/checkpoints
state.savepoints.dir:hdfs:///user/flink110/savepoints
jobmanager.execution.failover-strategy: region
rest.port: 8081
taskmanager.memory.preallocate: false
classloader.resolve-order: parent-first
security.kerberos.login.use-ticket-cache: truesecurity.kerberos.login.keytab:/home/flink_user/flink_user.keytab security.kerberos.login.principal: flink_user jobmanager.archive.fs.dir:hdfs:///user/flink110/completed-jobs historyserver.web.address: 0.0.0.0 historyserver. Web. Port: 8082 historyserver.archive.fs.dir:hdfs:///user/flink110/completed-jobs historyserver.archive.fs.refresh-interval: 10000Copy the code

The author only lists some common configuration parameters for readers to modify according to actual conditions. Configuration parameters are actually relatively easy to understand, later combined with the actual combat of the article to explain in detail.

■ Integrate Hive configuration dependencies

To use Flink’s integration with Hive, you need to add the following dependencies:

  • If you want to use SQL Client, you need to copy the dependent JARS into Flink’s lib directory
  • If you need to use the Table API, you need to add the corresponding dependencies to your project (such as pom.xml)
<! -- Flink Dependency --> <dependency> <groupId>org.apache.flink</groupId> < artifactId > flink - connector - hive_2. 11 < / artifactId > < version > 1.11 - the SNAPSHOT < / version > < scope > provided < / scope > < / dependency > < the dependency > < groupId > org. Apache. The flink < / groupId > < artifactId > flink - table - API - Java - bridge_2. 11 < / artifactId > < version > 1.11 - the SNAPSHOT < / version > the < scope > provided < / scope > < / dependency > <! -- Hive Dependency --> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>${hive.version}</version>
<scope>provided</scope>
</dependency>
Copy the code

The author mainly introduces how to use SQL Client. Because the CDH version used is 5.16.2, Hadoop version is 2.6.0, Hive version is 1.1.0, so you need to copy the following JAR packages to the lib directory in the Flink deployment home directory:

  • Flink Hive of connector

Flink – connector – hive2.11-1.10.0. Jar flink – hadoop – compatibility2.11-1.10.0. Jar flink – orc_2. 11-1.10.0. Jar

Flink - release - 1.10.0 - rc1 / flink - connectors/flink - hadoop - compatibility/target/flink - hadoop - compatibility_2. 11-1.10.0. Jar Flink - release - 1.10.0 - rc1 / flink - connectors/flink - connector - hive/target/flink - connector - hive_2. 11-1.10.0. Jar Flink - release - 1.10.0 - rc1 / flink - formats/flink - / target/orc flink - orc_2. 11-1.10.0. JarCopy the code
  • Hadoop relies on

Flink – shaded – hadoop – 2 – uber – server – cdh5.16.2-9.0. The jar

Flink - shaded/flink - shaded - hadoop - 2 - uber/target/flink - shaded - hadoop - 2 - uber - server - cdh5.16.2-9.0. The jarCopy the code
  • Hive rely on

Hive – exec – 1.1.0 – cdh5.16.2. Jar hive – metastore – 1.1.0 – cdh5.16.2. Jar libfb303-0.9.3. Jar

/ opt/cloudera/parcels/CDH/lib/hive/lib/hive - exec - 1.1.0 - cdh5.16.2. Jar / opt/cloudera/parcels/CDH/lib/hive/lib/hive - metastore - 1.1.0 - cdh5.16.2. Jar / opt/cloudera/parcels/CDH/lib/hive/lib/libfb303-0.9.3. JarCopy the code

Flink-shaded hadoop-2-uber includes Hive’s dependence on Hadoop. If you do not use the Packages provided by Flink, you can also add Hadoop packages used in the cluster, but you need to ensure that the added Hadoop version is compatible with the version that Hive relies on.

Dependent Hive packages (hive-exec and hive-metastore) can also use jar packages provided by Hive in user clusters. For details, see Supported Hive versions.

Hadoop, Yarn, and Hive clients must be added to Flink nodes.

** ■ HiveCatalog**

Over the years, Hive Metastore has evolved into a de facto metadata center within the Hadoop ecosystem. Many companies have a separate Hive Metastore service instance in their production to manage all their metadata (Hive metadata or non-Hive metadata).

If both Hive and Flink are deployed, HiveCatalog can use Hive Metastore to manage Flink metadata.

If only Flink is deployed, HiveCatalog is the only persistent Catalog that Flink provides out of the box. Without a persistent Catalog, meta-objects like Kafka tables must be repeatedly created in each session using the Flink SQL CREATE DDL, which can waste a lot of time. HiveCatalog allows users to create tables and other meta-objects only once, and can be easily referenced and managed later across sessions.

To use SQL Client, users need to specify their own Catalog in sqL-client-defaults. yaml. You can specify one or more Catalog instances in the catalogs list of SQL-client-defaults. yaml.

The following example shows how to specify a HiveCatalog:

execution:

    planner: blink
    type: streaming
    ...
    current-catalog: myhive  # set the HiveCatalog as the current catalog of the session
    current-database: mydatabase

catalogs:  
  - name: myhive
     type: hive
     hive-conf-dir: /opt/hive-conf  # contains hive-site.xmlHive - version: 2.3.4Copy the code

Among them:

  • Name is the name that the user assigns to each Catalog instance. The Catalog name and DB name form the metadata namespace in FlinkSQL, so each Catalog name needs to be unique.
  • Type indicates the Catalog type. For HiveCatalog, type should be hive.
  • Hive-conf-dir Is used to read hive configuration files. You can set it as the hive configuration file directory in the cluster.
  • Hive-version Specifies the Hive version.

Once HiveCatalog is specified, the user can start sqL-client and verify that HiveCatalog has been loaded correctly by running the following command.

Flink SQL> show catalogs;
default_catalog
myhive

Flink SQL> use catalog myhive;
Copy the code

Show catalogs lists all loaded catalogs instances. Note that in addition to the Catalog configured by the user in the SQL-client-defaults. yaml file, FlinkSQL also automatically loads an instance of GenericInMemoryCatalog as the built-in Catalog, The default name of this built-in Catalog is default_CATALOG.

Read and write Hive table

After setting HiveCatalog, you can use SQL Client or Table API to read and write tables in Hive.

If you already have a table named myTable in Hive, you can use the following SQL statement to read and write the table.

** ■ Read data **

Flink SQL> show catalogs;
myhive
default_catalog

Flink SQL> use catalog myhive;

Flink SQL> show databases;
default

Flink SQL> show tables;
mytable

Flink SQL> describe mytable;
root

|-- name: name 
|-- type: STRING 
|-- name: value 
|-- type: DOUBLE

Flink SQL> SELECT * FROM mytable;

   name      value
__________ __________
   Tom        4.72
   John       8.0    
   Tom        24.2
   Bob.       3.14    
   Bob        4.72    
   Tom        34.9    
   Mary       4.79    
   Tiff          2.72    
   Bill          4.33    
   Mary       77.7
Copy the code

S writing data

Flink SQL> INSERT INTO mytable SELECT 'Tom',
 25;

Flink SQL> INSERT OVERWRITE mytable SELECT 'Tom', 25;

# static partition
Flink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1', my_date='2019-08-08') SELECT 'Tom', 25;

# dynamic partition

Flink SQL> INSERT OVERWRITE myparttable SELECT 'Tom', 25, 'type_1'.'2019-08-08';

Static and dynamic partitioning

Flink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1') SELECT 'Tom', 25, '2019-08-08';
Copy the code

conclusion

In this article, the author first introduced the architecture design of Flink and Hive integration function, and then compiled from the source code to solve some problems, and then deployed and configured Flink environment and integrated Hive specific operation process, finally referring to the official case, read and write operations on Hive table.

Later, the author will explain how to use Flink SQL to operate Hive based on the actual production environment.

Reference:

  • Ci.apache.org/projects/fl…
  • Ververica. Cn/developers /…