All of this walkthrough will be performed on the Flink SQL CLI, involving only PLAIN SQL text, without a single line of Java/Scala code, and without the need for an IDE.

Last Thursday, I shared “Demo: Building Streaming Applications Based on Flink SQL” live in the Dingdinggroup of Flink Chinese community, and the live content was biased towards practical demonstration. This article is a summary of the live content, and some improvements have been made, such as using Docker Compose installation for all components except Flink to simplify the preparation process. You can also learn along with this video. Complete can watch video sharing: www.bilibili.com/video/av905…

Flink 1.10.0 has just been released, releasing a number of exciting new features. The Flink SQL module, in particular, is developing at a very fast pace, so this article, from a practical point of view, leads you to explore how to quickly build streaming applications using Flink SQL.

Based on Kafka, MySQL, Elasticsearch and Kibana, Flink SQL is used to construct a real-time analysis application of e-commerce user behavior. All of this walkthrough will be performed on the Flink SQL CLI, involving only PLAIN SQL text, without a single line of Java/Scala code, and without the need for an IDE. The final effect picture of this practical exercise:

To prepare

A Linux or MacOS computer with Docker and Java8.

Start the container with Docker Compose

The components that this demo relies on are all programmed into a container, so they can be launched with one click from Docker-compose. You can download the docker-comemage. yml file either automatically with the wget command or manually.

mkdir flink-demo; cd flink-demo;
wget https://raw.githubusercontent.com/wuchong/flink-sql-demo/master/docker-compose.yml
Copy the code

Docker Compose contains containers for:

  • DataGen:Data generator. When the container is started, it automatically starts generating user behavior data and sends it to the Kafka cluster. By default, 1000 pieces of data are generated every second for about 3 hours. You can also change itdocker-compose.ymlThe datagenspeedupParameter to adjust the generation rate (restart docker Compose to take effect).
  • MySQL:MySQL 5.7 integration, and pre-created category tables (category), pre-filled the mapping relationship between subcategories and top classes, and then used as a dimension table.
  • Kafka: Primarily used as a data source. The DataGen component automatically fills the container with data.
  • Zookeeper: Kafka container dependency.
  • Elasticsearch: Stores Flink SQL output data.
  • Kibana: Visualizes data in Elasticsearch.

Before starting the container, it is recommended to modify the Docker configuration and adjust the resources to 4GB and 4 cores. To start all containers, just run the following command in the docker-comemage. yml directory.

docker-compose up -d
Copy the code

This command automatically starts all containers defined in the Docker Compose configuration in detached mode. You can use Docker PS to see if the above five containers are started properly. You can also visit http://localhost:5601/ to see if Kibana is working properly.

You can also stop all containers with the following command:

docker-compose down
Copy the code

Download and install the Flink local cluster

We recommend that users manually download and install Flink instead of automatically starting Flink through Docker. This makes it easier to understand Flink’s components, dependencies, and scripts.

  1. Download the Flink 1.10.0 installation package and extract the Flink (extract directory – 1.10.0) : www.apache.org/dist/flink/…

  2. Go to flink-1.10.0 directory: CD flink-1.10.0

  3. Run the following command to download the dependent JAR package and copy it to the lib/ directory, or manually download and copy it. This is because we need to rely on each connector implementation at runtime.

    Copy the code

Wget – p. / lib/repo1.maven.org/maven2/org/… | wget – p. / lib/repo1.maven.org/maven2/org/… | wget – p. / lib/repo1.maven.org/maven2/org/… | wget – p. / lib/repo1.maven.org/maven2/org/… | wget – p. / lib/repo1.maven.org/maven2/mysq…

! [](https://p1-jj.byteimg.com/tos-cn-i-t2oaga2asx/gold-user-assets/2020/2/27/17086efb1beff28e~tplv-t2oaga2asx-image.image 4). Will ` conf/flink - the conf. Yaml ` ` in taskmanager. NumberOfTaskSlots ` modified into 10, because we will be running multiple tasks at the same time. 5. Run the./bin/start-cluster.sh command to start the cluster. If it runs successfully, you can access the Flink Web UI at http://localhost:8081. And you can see that there are 10 Slots available. ! [2.jpg](https://ucc.alicdn.com/pic/developer-ecology/ff8fa73598584c83aadea3414ecc4b98.jpg) 6. Run 'bin/ sqL-client. sh embedded' to start the SQL CLI. You will see the squirrel welcome screen below. ! [3.png](https://ucc.alicdn.com/pic/developer-ecology/35a3fab21cbe4fe4bda635c7964b34a6.png)Create Kafka table with DDLThe Datagen container writes data continuously to Kafka's 'user_Behavior' topic after startup. Data contains user behaviors on November 27, 2017 (behaviors include click, buy, add buy and like). Each line represents a user behavior, which is composed of user ID, commodity ID, commodity category ID, behavior type and time in JSON format. The original data set from [ali YunTianChi public data sets] (https://tianchi.aliyun.com/dataset/dataDetail?dataId=649), each write-up. You can run the following command in the 'docker-comemess. yml' directory to see the top 10 data generated in the Kafka cluster.Copy the code

docker-compose exec kafka bash -c ‘kafka-console-consumer.sh –topic user_behavior –bootstrap-server kafka:9094 –from-beginning –max-messages 10’


Copy the code

{“user_id”: “952483”, “item_id”:”310884″, “category_id”: “4580532”, “behavior”: “pv”, “ts”: “2017-11-27T00:00:00Z”} {“user_id”: “794777”, “item_id”:”5119439″, “category_id”: “982926”, “behavior”: “pv”, “ts”: “2017-11-27T00:00:00Z”} …

Now that we have the data source, we can use DDL to create and connect topics in Kafka. The DDL is executed in the Flink SQL CLI.Copy the code

CREATE TABLE user_behavior ( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, ts TIMESTAMP(3), WATERMARK FOR ts as ts-interval ‘5’ SECOND proctime as proctime () WITH (‘connector.type’ = ‘kafka’, — use kafka connector ‘connector.version’ = ‘universal’, — kafka version, Connector. topic’ = ‘user_behavior’, — kafka topic ‘connector.startup-mode’ = ‘earliest-offset’, — read from its starting offset ‘connector. The properties. The zookeeper. Connect’ = ‘localhost: 2181’, — the zookeeper address ‘connector. The properties. The bootstrap. The servers’ =’ localhost: 9092 ‘, — kafka broker address ‘format.type’ = ‘json’ — data source format is JSON);

In addition to declaring five fields according to the format of the data above, we also declare a virtual column that generates processing time through computed column syntax and the built-in 'PROCTIME()' function. We also used the WATERMARK syntax to declare the WATERMARK policy on the TS field (which tolerates 5 seconds out of order), making the TS field an event time column as well. Time attributes and DDL syntax can be read in the official documentation for more information: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/time_attributes.html - DDL: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#create-tableOn the SQL CLI, you can run the 'show tables' command to create a Kafka table. ` and ` describe user_behavior; 'to view the currently registered tables, as well as the details of the tables. You can also run 'SELECT * FROM user_behavior' directly in SQL CLI; 'Preview the data (press' q 'to exit). Next, we'll take a closer look at Flink SQL through three field scenarios.## Count hourly trading volume

Create Elasticsearch table with DDLWe first create an ES result table in SQL CLI. According to the requirements of the scenario, we mainly need to save two data: hour and volume.Copy the code

CREATE TABLE buy_cnt_per_hour ( hour_of_day BIGINT, buy_cnt BIGINT ) WITH ( ‘connector.type’ = ‘elasticsearch’, Elasticsearch connector ‘connector. Version ‘= ‘6’, — ElasticSearch version, 6 supports es 6+ and 7+ versions ‘connector.hosts’ = ‘http://localhost:9200’, — elasticSearch address ‘connector.index’ = ‘buy_cnt_per_hour’, ‘connector. documenttype ‘= ‘user_behavior’, — elasticSearch type, ‘connector.bulk-flush. Max-actions’ = ‘1’, ‘format. Type ‘= ‘json’, Json ‘update-mode’ = ‘append’);

We don't need to create the 'buy_cnt_per_hour' index in Elasticsearch, Flink Job will create it automatically.Submit the Query # # #The number of transactions per hour is how many there are per hour"buy"User behavior. Hence the TUMBLE window function, which cuts the window in one hour. Then each window counts separately"buy"The number of alpha, and this can be filtered out first"buy"Then 'COUNT(*)' is implemented. ```sql INSERT INTO buy_cnt_per_hour SELECT HOUR(TUMBLE_START(ts, INTERVAL'1' HOUR)), COUNT(*)
FROM user_behavior
WHERE behavior = 'buy'
GROUP BY TUMBLE(ts, INTERVAL '1' HOUR);
Copy the code

Here we use the HOUR built-in function to extract the value of the HOUR of the day from a TIMESTAMP column. INSERT INTO is used to continuously INSERT query results INTO the es result table defined above (you can think of the ES result table as the materialized view of Query). In addition to read the documentation for more about the content of the window aggregation: ci.apache.org/projects/fl…

After running the above query in the Flink SQL CLI, you can see the submitted task in the Flink Web UI, which is a streaming task and therefore runs forever.

Visualize results using Kibana

We’ve started the Kibana container with Docker Compose and Kibana can be accessed at http://localhost:5601. First, we need to configure an index pattern. Click “Management” on the left toolbar to find “Index Patterns”. Click “Create Index Pattern” and Create Index Pattern by entering the full Index name “buy_cnT_per_hour”. Once created, Kibana knows about our index and we can start exploring the data.

Click the “Discovery” button on the left toolbar and Kibana will list the contents of the newly created index.

Next, let’s create a Dashboard to display the various visualizations. Click “Dashboard” on the left side of the page to create a Dashboard called “User Behavior Log Analysis”. Then click “Create New” to Create a New view, select the “Area” Area map, select the “buy_CNT_per_hour” index, draw the volume Area map as shown in the screenshot below (left), and save it as “Volume per hour”.

You can see that early morning is the low point of the day.

Collects statistics on the number of independent users every 10 minutes every day

Another interesting visualization is the cumulative number of unique users (UVs) at each moment of the day, meaning that the number of UVs at each moment represents the total number of UVs from 0 to the current moment, so the curve must be monotonically increasing.

We still start by creating an Elasticsearch table in the SQL CLI to store the summary data of the results. There are two main fields: time and cumulative UV number.

CREATE TABLE cumulative_uv (
    time_str STRING,
    uv BIGINT
) WITH (
    'connector.type' = 'elasticsearch'.'connector.version' = '6'.'connector.hosts' = 'http://localhost:9200'.'connector.index' = 'cumulative_uv'.'connector.document-type' = 'user_behavior'.'format.type' = 'json'.'update-mode' = 'upsert'
);
Copy the code

To achieve this curve, we can first calculate the current minute of each piece of data and the current cumulative UV (the number of unique users from 0 to the current behavior) through OVER WINDOW. Uv statistics are done using the built-in COUNT(DISTINCT user_id), which is optimized for COUNT DISTINCT in Flink SQL so that it can be used safely.

CREATE VIEW uv_per_10min AS
SELECT 
  MAX(SUBSTR(DATE_FORMAT(ts, 'HH:mm'), 1, 4) | |'0') OVER w AS time_str, 
  COUNT(DISTINCT user_id) OVER w AS uv
FROM user_behavior
WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);
Copy the code

Here we use the SUBSTR and DATE_FORMAT and | | built-in functions, will be transformed into a TIMESTAMP fields 10 minutes unit time string, such as: therefore, silla. More content about OVER the WINDOW can be reference documentation: ci.apache.org/projects/fl…

We also use the CREATE VIEW syntax to register a Query as a logical VIEW, which can be easily referenced in subsequent queries, making it easier to unpick complex queries. Note that creating a logical view does not trigger the execution of the job and the results of the view do not land, so it is very light to use with no overhead. Uv_per_10min Each input data generates one output data, so the storage pressure is high. Elasticsearch can be aggregated in minutes at uv_per_10min, so only one point will be stored in Elasticsearch every 10 minutes, making the visual rendering of Elasticsearch and Kibana much less difficult.

INSERT INTO cumulative_uv
SELECT time_str, MAX(uv)
FROM uv_per_10min
GROUP BY time_str;
Copy the code

After submitting the above query, create the index pattern of Cumulative_uv in Kibana, then create a “Line” Line chart in Dashboard, select the index of Cumulative_uv, Draw the cumulative independent user number curve as shown in the following screenshot (left) and save the curve.

Top category rankings

The last interesting visualization is the category leaderboard to see which categories are the pillar categories. However, because the category classification in the source data is too detailed (about 5000 categories), it is not meaningful for the ranking, so we hope to reduce it to the top category. Therefore, the author preprepared the mapping data of subcategories and top classes in the mysql container to be used as dimension table.

Create a MySQL table on the SQL CLI for subsequent dimension table queries.

CREATE TABLE category_DIM (sub_category_id BIGINT, -- subcategory parent_category_id BIGINT -- top category) WITH ('connector.type' = 'jdbc'.'connector.url' = 'jdbc:mysql://localhost:3306/flink'.'connector.table' = 'category'.'connector.driver' = 'com.mysql.jdbc.Driver'.'connector.username' = 'root'.'connector.password' = '123456'.'connector.lookup.cache.max-rows' = '5000'.'connector.lookup.cache.ttl' = '10min'
);
Copy the code

Create a new Elasticsearch table to store category statistics.

CREATE TABLE top_category (category_name STRING, -- categoryname buy_cnT BIGINT -- sales) WITH ('connector.type' = 'elasticsearch'.'connector.version' = '6'.'connector.hosts' = 'http://localhost:9200'.'connector.index' = 'top_category'.'connector.document-type' = 'user_behavior'.'format.type' = 'json'.'update-mode' = 'upsert'
);
Copy the code

In the first step, we complete the category name through dimension table association. We still use CREATE VIEW to register the query as a VIEW, simplifying the logic. Dimension tables associated using temporal join syntax, can view the documentation for more: ci.apache.org/projects/fl…

CREATE VIEW rich_user_behavior AS
SELECT U.user_id, U.item_id, U.behavior, 
  CASE C.parent_category_id
    WHEN 1 THEN 'Clothing, shoes and bags'
    WHEN 2 THEN 'Home Decoration'
    WHEN 3 THEN 'goods'
    WHEN 4 THEN 'beauty'
    WHEN 5 THEN 'mother'
    WHEN 6 THEN '3 c digital'
    WHEN 7 THEN 'Sports outdoors'
    WHEN 8 THEN 'food'
    ELSE 'other'
  END AS category_name
FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF U.proctime AS C
ON U.category_id = C.sub_category_id;
Copy the code

At last, count the number of buy events by category name and write them into Elasticsearch.

INSERT INTO top_category
SELECT category_name, COUNT(*) buy_cnt
FROM rich_user_behavior
WHERE behavior = 'buy'
GROUP BY category_name;
Copy the code

After submitting the above query, create a Top_category index pattern in Kibana, then create a “Horizontal Bar” Bar chart in Dashboard, select the top_category index, Draw the category leaderboard as shown in the following screenshot (left) and save it.

It can be seen that the turnover of clothing shoes and bags is far ahead of other categories.

So far, we have completed three field cases and their visualizations. Now you can go back to the Dashboard page and drag and drop the views to make our Dashboard look more formal and intuitive (see the renderings at the beginning of this article). Of course, Kibana also provides very rich graphics and visualization options, and there is a lot of interesting information to be found in the user behavior data. Interested readers can use Flink SQL to analyze the data in more dimensions, and use Kibana to display more visualizations, and observe the real-time changes of the graph data.

At the end

In this article, we have shown how to use Flink SQL to integrate Kafka, MySQL, Elasticsearch, and Kibana to quickly build a real-time analysis application. The whole process is done without a single line of Java/Scala code, using PLAIN SQL text. Flink SQL is easy to use and powerful, including easy connection to various external systems, native support for event timing and out-of-order data processing, dimension table association, rich built-in functions, and more. Hope you will enjoy our practice and enjoy the fun and knowledge!