It’s been three weeks since Flink 1.11 was officially released, and one of the features that really caught my eye is Hive Streaming. Flink Hive Streaming on Zeppelin-0.9-Preview2 was released recently, so I wrote an analysis of the Flink Hive Streaming on Zeppelin. This article is mainly shared with you from the following parts:

  • The significance of Hive Streaming
  • Checkpoint & Dependency
  • Write Kafka
  • Hive Streaming Sink
  • Hive Streaming Source
  • Hive Temporal Table

The significance of Hive Streaming

Some students may wonder why Hive Streaming is featured so prominently in Flink 1.11. What can its appearance bring us? In fact, in the field of big data, there have always been two architectures Lambda and Kappa:

  • Lambda architecture — Stream batch separation, static data is synchronized to Hive data warehouse through scheduled scheduling, real-time data is both synchronized to Hive and consumed by the real-time computing engine, here leads to a problem.

  • Data caliber problem

  • The output delay of offline calculation is too large

  • Redundant data storage

  • Kappa architecture — all uses real-time computation to produce data, and historical data is calculated by backtracking the consumption points of messages. There are also many problems, because there is no one-sip-all architecture.

  • Message-oriented middleware cannot retain all historical data, which is stored in rows and occupies too much space

  • Real-time computing of historical data is inadequate

  • Ad-hoc analysis could not be performed

In order to solve these problems, the industry launched a real-time data warehouse, which solved most of the pain points, but there are still some inadequate. What about calculations involving historical data, for example? What if I want to do ad-hoc analysis? Therefore, real-time data warehouse and offline data warehouse exist in parallel in the industry, which brings more problems: multiple models, inconsistent data output, calculation of historical data and so on.

Hive Streaming solves these problems! No more multiple models; You don’t need the same metric because you’re dealing with historical data, so you write live SQL and you write offline SQL; Ad-hoc can do it. How? Hive Streaming output table to read!

Let’s start with parameter configuration, then stream write to Hive, then stream read to Hive table, and finally Join to Hive dimension table. After this whole process has been experienced, we must have a deeper understanding of Hive Streaming, can appreciate its role.

Checkpoint & Dependency

Since files change from in-progress to Finish only after Checkpoint, we need to properly configure Checkpoint. In Zeppelin, Checkpoint configuration is easy.

% flink. Conf # checkpoint configuration pipeline. Time - characteristic EventTime execution. The checkpointing. The interval is 120000 execution.checkpointing.min-pause 60000 execution.checkpointing.timeout 60000 Execution. Checkpointing. The externalized checkpoint - retention RETAIN_ON_CANCELLATION # dependent jar package configuration flink. Execution. Packages Org. Apache. Flink: flink - connector - kafka_2. Now. 11.0 org. Apache. Flink: flink connector - kafka - base_2. Now. 11.0Copy the code

Since we need to read data from Kafka, we add Kafka dependencies.

Write Kafka

Our data comes from the Tianchi dataset and is stored on local disk in CSV format, so we need to write them to Kafka first.

CSV Source Kafka Sink table

%flink.ssql
SET table.sql-dialect=default;
DROP TABLE IF EXISTS source_csv;
CREATE TABLE source_csv (
user_id string,
theme_id string,
item_id string,
leaf_cate_id string,
cate_level1_id string,
clk_cnt int,
reach_time string
) WITH (
 'connector' = 'filesystem',
 'path' = 'file:///Users/dijie/Downloads/Cloud_Theme_Click/theme_click_log.csv',
 'format' = 'csv'
 
 )
Copy the code
%flink.ssql SET table.sql-dialect=default; DROP TABLE IF EXISTS kafka_table; CREATE TABLE kafka_table ( user_id string, theme_id string, item_id string, leaf_cate_id string, cate_level1_id string, clk_cnt int, reach_time string, ts AS localtimestamp, WATERMARK FOR ts AS ts - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'theme_click_log', 'properties. The bootstrap. The servers' =' 10.70.98.1:9092 ', 'properties. Group. Id =' testGroup ', 'format' = 'json', 'scan.startup.mode' = 'latest-offset' )Copy the code

Because the registered table could read and write, I added Watermark to the table while building it. Because the timestamp in the source data was old, I used the current time minus 5 seconds as my Watermark.

As you can see, I specified the SQL dialect Default at the beginning of the statement. Why is this? Are there any other dialects? Don’t worry. Just listen to me.

In fact, in the previous version, Flink has been able to get through with Hive, including the table can be built on Hive, but a lot of syntax and Hive incompatible, including the table can not be viewed in Hive, the main reason is incompatible dialect. In Flink 1.11, you can create Hive tables in DDL and query them in Hive. Flink supports dialects and defaults to Default. Use the Hive dialect. For details, see the link in the following section.

Hive dialects:

Ci.apache.org/projects/fl…

The data is then read from CSV and written to Kafka.

%flink.ssql(type=update)

insert into kafka_table select * from source_csv ;
Copy the code

Take another look at Kafka to see if the data is being poured in:

Okay, so let’s write Hive.

Hive Streaming Sink

Set up a Hive Sink Table and remember to switch dialect to Hive, otherwise there will be problems.

%flink.ssql
SET table.sql-dialect=hive;
DROP TABLE IF EXISTS hive_table;
CREATE TABLE hive_table (
user_id string,
theme_id string,
item_id string,
leaf_cate_id string,
cate_level1_id string,
clk_cnt int,
reach_time string
) PARTITIONED BY (dt string, hr string, mi string) STORED AS parquet TBLPROPERTIES (

 'partition.time-extractor.timestamp-pattern'='$dt $hr:$mi:00',
 'sink.partition-commit.trigger'='partition-time',
 'sink.partition-commit.delay'='1 min',
 'sink.partition-commit.policy.kind'='metastore,success-file'

);
Copy the code

Parameters to give you a little explanation:

  • Partition. Time-extractor. Timestamp -pattern: partition time extractor, consistent with the partition field in DDL;
  • Sink. partition-commit.trigger: indicates the partition trigger type. The value can be process-time or partition-time. Process-time: the preceding parameters and watermark are not required. If the current time is greater than the time when the partition is created +sink.partition-commit.delay, the partition is committed. Partition -time: the partition needs to be defined in the Source table. When watermark > partition time extracted +sink.partition-commit.delay, the partition is committed.
  • Sink. partition-commit.delay: indicates the delay time.
  • Sink.partition -commit.policy.kind: after the commit is successful, you need to notify MetaStore so that Hive can read your latest partition data. If you need to merge small files, you can also customize classes by implementing the PartitionCommitPolicy interface.

Let’s insert data into the Hive Table we just created:

%flink.ssql

insert into hive_table select  user_id,theme_id,item_id,leaf_cate_id,cate_level1_id,clk_cnt,reach_time,DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH') ,DATE_FORMAT(ts, 'mm') from kafka_table
Copy the code

Let’s go ahead and pour a cup of Java ’95 ☕️.

And then look at our HDFS and see what’s in the path.

You can also use Hive query look, I will first sell a secret, while using Hive Streaming to read data.

Hive Streaming Source

[root@hive] [root@hive] [root@hive] [root@hive] [root@hive] [root@hive] [root@hive] [root@hive]

Hive Streaming Source cannot read new files from partitions that have already been read. Simply put, once you read a partition, you don’t read it again. This may seem like a bad idea, but if you think about it, it fits the nature of flow.

As usual, I will tell you what parameters mean:

  • Stream-source. enable: indicates whether to enable the stream mode.
  • Stream-source. monitor-interval: monitors the interval at which new files or partitions are generated.
  • Stream-source. Consume -order: create-time or partition-time; Create-time indicates the time when a file or folder is created in the HDFS, not the time when a partition is created. Partition -time specifies the partition time. For non-partitioned tables, only create-time can be used. The introduction on the official website is a little vague, which will make people mistakenly think that they can check the newly added files under the partition that they have read. In fact, after my testing and looking at the source code, I found that they can not.
  • Stream-source. Consume -start-offset: indicates the partition from which the data is read.

Just say not false type, let us grab a data to see ~

SET line must be carried, otherwise Table Hints cannot be used.

Hive Temporal Table

After seeing the Streaming Source and Streaming Sink, let’s finally try Hive as a dimension table.

In fact, using Hive dimension Table is very simple, as long as the Table exists in Hive, can be used as a dimension Table, parameter can be covered by Table Hints.

  • Join.cache. TTL: indicates the cache time. Note that Hive dimension tables cache all dimension table data in TM memory. If the TTL time is too short, data will be loaded frequently and performance will be affected.

Since it is LEFT JOIN, data that does not exist in the dimension table will be completed with NULL. Take another look at the DAG chart:

If you look at the frame, you can see that this is the dimension table associated with LookupJoin.

For system_time as of a.p, the DAG will look like this:

This is not a dimension table JOIN, but rather a stream and batch JOIN.

Write in the last

The improvement of Hive Streaming means that it has broken through the last barrier of the integration of Streaming and batch. It can not only do OLAP analysis of historical data, but also spit out the results in real time, which is undoubtedly a good news for ETL developers. Surely in the following days, more enterprises will complete the construction of their real-time data warehouse.

Reference Documents:

[1] ci.apache.org/projects/fl… [2] github.com/apache/zepp…

Note the download:

Github.com/lonelyGhost…

Finally, I would like to introduce Flink on Zeppelin peggroup, you can have a question in the discussion, Apache Zeppelin PMC Jane feng, you can directly ask questions in the peggroup exchange ~

About the author:

Di Jie, senior data expert of Mogujie, is responsible for the real-time computing platform of Mogujie. Focus is currently in Flink on Zeppelin, Apache Zeppelin Contributor.