Flink 1.10.0 has just been released, releasing a number of exciting new features. The Flink SQL module, in particular, is developing at a very fast pace, so this article, from a practical point of view, leads you to explore how to quickly build streaming applications using Flink SQL.

Based on Kafka, MySQL, Elasticsearch and Kibana, Flink SQL is used to construct a real-time analysis application of e-commerce user behavior. All of this walkthrough will be performed on the Flink SQL CLI, involving only PLAIN SQL text, without a single line of Java/Scala code, and without the need for an IDE. The final effect picture of this practical exercise:

To prepare

A Linux or MacOS computer with Docker and Java8.

Start the container with Docker Compose

The components that this demo relies on are all programmed into a container, so they can be launched with one click from Docker-compose. You can download the docker-comemage. yml file either automatically with the wget command or manually.

mkdir flink-demo; cd flink-demo;
wget https://raw.githubusercontent.com/wuchong/flink-sql-demo/master/docker-compose.ymlCopy the code


Docker Compose contains containers for:

  • DataGen:Data generator. When the container is started, it automatically starts generating user behavior data and sends it to the Kafka cluster. By default, 1000 pieces of data are generated every second for about 3 hours. You can also change itdocker-compose.ymlThe datagenspeedupParameter to adjust the generation rate (restart docker Compose to take effect).
  • MySQL:MySQL 5.7 integration, and pre-created category tables (category), pre-filled the mapping relationship between subcategories and top classes, and then used as a dimension table.
  • Kafka: Primarily used as a data source. The DataGen component automatically fills the container with data.
  • Zookeeper: Kafka container dependency.
  • Elasticsearch: Stores Flink SQL output data.
  • Kibana: Visualizes data in Elasticsearch.

Before starting the container, it is recommended to modify the Docker configuration and adjust the resources to 4GB and 4 cores. To start all containers, just run the following command in the docker-comemage. yml directory.

docker-compose up -dCopy the code


This command automatically starts all containers defined in the Docker Compose configuration in detached mode. You can use Docker PS to see if the above five containers are started properly. You can also visit http://localhost:5601/ to see if Kibana is working properly.

You can also stop all containers with the following command:

docker-compose downCopy the code


Download and install the Flink local cluster

We recommend that users manually download and install Flink instead of automatically starting Flink through Docker. This makes it easier to understand Flink’s components, dependencies, and scripts.

  1. Download the Flink 1.10.0 installation package and unzip (unzip the directoryFlink - 1.10.0) :www.apache.org/dist/flink/…
  2. Enter the flink-1.10.0 directory:CD flink - 1.10.0
  3. Run the following command to download the dependent JAR package and copy it to the lib/ directory, or manually download and copy it. This is because we need to rely on each connector implementation at runtime.

  4. – p. / lib/repo1.maven.org/maven2/org/… | \ wget – p. / lib/repo1.maven.org/maven2/org/… | \ wget – p. / lib/repo1.maven.org/maven2/org/… | \ wget – p. / lib/repo1.maven.org/maven2/org/… | \ wget – p. / lib/repo1.maven.org/maven2/mysq…

  5. willconf/flink-conf.yamlIn thetaskmanager.numberOfTaskSlotsChange it to 10 because we’ll be running multiple tasks at the same time.
  6. perform./bin/start-cluster.shTo start the cluster.

    If it runs successfully, it can run in thehttp://localhost:8081Access the Flink Web UI. And you can see that there are 10 Slots available.

  1. performbin/sql-client.sh embeddedStart the SQL CLI. You will see the squirrel welcome screen below.

Create Kafka tables using DDL

The Datagen container writes data continuously to Kafka’s User_Behavior Topic after startup. Data contains user behaviors on November 27, 2017 (behaviors include click, buy, add buy and like). Each line represents a user behavior, which is composed of user ID, commodity ID, commodity category ID, behavior type and time in JSON format. This original data set is from ali Yuntianchi public data set, which is hereby appreciated.

You can run the following command in the docker-comemage. yml directory to see the top 10 data generated in the Kafka cluster.

docker-compose exec kafka bash -c 'kafka-console-consumer.sh --topic user_behavior --bootstrap-server kafka:9094 --from-beginning --max-messages 10'Copy the code


{"user_id": "952483"."item_id":"310884"."category_id": "4580532"."behavior": "pv"."ts": "2017-11-27T00:00:00Z"}
{"user_id": "794777"."item_id":"5119439"."category_id": "982926"."behavior": "pv"."ts": "2017-11-27T00:00:00Z"}...Copy the code


Now that we have the data source, we can use DDL to create and connect topics in Kafka. The DDL is executed in the Flink SQL CLI.

CREATE TABLE user_behavior ( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, ts TIMESTAMP(3), Proctime as proctime (), -- computes the column to generate a WATERMARK FOR ts as TS-interval'5'SECOND -- define watermark on TS, ts becomes event time column) WITH ('connector.type' = 'kafka'Use the Kafka connector'connector.version' = 'universal'Kafka version, universal supports versions 0.11 and above'connector.topic' = 'user_behavior',  -- kafka topic
    'connector.startup-mode' = 'earliest-offset', -- reads from the start offset'connector.properties.zookeeper.connect' = 'localhost:2181', -- ZooKeeper address'connector.properties.bootstrap.servers' = 'localhost:9092'-- Kafka broker address'format.type' = 'json'-- Data source format is JSON);Copy the code


In addition to declaring five fields in the format of the data above, we also declare a virtual column that generates processing time through computed column syntax and the built-in PROCTIME() function. We also used the WATERMARK syntax to declare the WATERMARK policy on the TS field (which tolerates 5 seconds out of order), making the TS field an event time column as well. You can read the official documentation to learn more about the time attribute and DDL syntax:

  • Time property: ci.apache.org/projects/fl…
  • DDL:ci.apache.org/projects/fl…

After creating a Kafka table on the SQL CLI, run show tables. And the describe user_behavior; To view the currently registered tables, as well as the details of the tables. You can also run SELECT * FROM user_behavior directly in the SQL CLI; Preview the data (press Q to exit).

Next, we’ll take a closer look at Flink SQL through three field scenarios.

Statistics of hourly trading volume

Create Elasticsearch table using DDL

We first create an ES result table in SQL CLI. According to the requirements of the scenario, we mainly need to save two data: hour and volume.

CREATE TABLE buy_cnt_per_hour ( 
    hour_of_day BIGINT,
    buy_cnt BIGINT
) WITH (
    'connector.type' = 'elasticsearch'-- Use elasticSearch Connector'connector.version' = '6'-- ElasticSearch version 6 supports ES 6+ and 7+ versions'connector.hosts' = 'http://localhost:9200'-- address of elasticSearch'connector.index' = 'buy_cnt_per_hour', -- elasticSearch index name, equivalent to the database table name'connector.document-type' = 'user_behavior'- elasticsearchtypeIs equivalent to the name of the database'connector.bulk-flush.max-actions' = '1'-- Refresh every data'format.type' = 'json'The output data format is JSON'update-mode' = 'append'
);Copy the code


There is no need to create the buy_cnT_per_hour index in Elasticsearch, Flink Job will create the index automatically.

Submit a Query

The number of buys per hour is the number of buys per hour. Hence the TUMBLE window function, which cuts the window in one hour. Then each window counts the number of “buys” separately. This can be done by filtering out “buys” and then counting (*).

INSERT INTO buy_cnt_per_hour
SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)), COUNT(*)
FROM user_behavior
WHERE behavior = 'buy'
GROUP BY TUMBLE(ts, INTERVAL '1' HOUR);Copy the code


Here we use the HOUR built-in function to extract the value of the HOUR of the day from a TIMESTAMP column. INSERT INTO is used to continuously INSERT query results INTO the es result table defined above (you can think of the ES result table as the materialized view of Query). In addition to read the documentation for more about the content of the window aggregation: ci.apache.org/projects/fl…

After running the above query in the Flink SQL CLI, you can see the submitted task in the Flink Web UI, which is a streaming task and therefore runs forever.

Visualize results using Kibana

We’ve started the Kibana container with Docker Compose and Kibana can be accessed at http://localhost:5601. First, we need to configure an index pattern. Click “Management” on the left toolbar to find “Index Patterns”. Click “Create Index Pattern” and Create Index Pattern by entering the full Index name “buy_cnT_per_hour”. Once created, Kibana knows about our index and we can start exploring the data.

Click the “Discovery” button on the left toolbar and Kibana will list the contents of the newly created index.

Next, let’s create a Dashboard to display the various visualizations. Click “Dashboard” on the left side of the page to create a Dashboard called “User Behavior Log Analysis”. Then click “Create New” to Create a New view, select the “Area” Area map, select the “buy_CNT_per_hour” index, draw the volume Area map as shown in the screenshot below (left), and save it as “Volume per hour”.

You can see that early morning is the low point of the day.

Collects statistics on the number of independent users every 10 minutes every day

Another interesting visualization is the cumulative number of unique users (UVs) at each moment of the day, meaning that the number of UVs at each moment represents the total number of UVs from 0 to the current moment, so the curve must be monotonically increasing.

We still start by creating an Elasticsearch table in the SQL CLI to store the summary data of the results. There are two main fields: time and cumulative UV number.

CREATE TABLE cumulative_uv (
    time_str STRING,
    uv BIGINT
) WITH (
    'connector.type' = 'elasticsearch'.'connector.version' = '6'.'connector.hosts' = 'http://localhost:9200'.'connector.index' = 'cumulative_uv'.'connector.document-type' = 'user_behavior'.'format.type' = 'json'.'update-mode' = 'upsert'
);Copy the code


To achieve this curve, we can first calculate the current minute of each piece of data and the current cumulative UV (the number of unique users from 0 to the current behavior) through OVER WINDOW. Uv statistics are done using the built-in COUNT(DISTINCT user_id), which is optimized for COUNT DISTINCT in Flink SQL so that it can be used safely.

CREATE VIEW uv_per_10min AS
SELECT 
  MAX(SUBSTR(DATE_FORMAT(ts, 'HH:mm'), 1, 4) | |'0') OVER w AS time_str, 
  COUNT(DISTINCT user_id) OVER w AS uv
FROM user_behavior
WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);Copy the code


Here we use the SUBSTR and DATE_FORMAT and | | built-in functions, will be transformed into a TIMESTAMP fields 10 minutes unit time string, such as: therefore, silla. More content about OVER the WINDOW can be reference documentation: ci.apache.org/projects/fl…

We also use the CREATE VIEW syntax to register a Query as a logical VIEW, which can be easily referenced in subsequent queries, making it easier to unpick complex queries. Note that creating a logical view does not trigger the execution of the job and the results of the view do not land, so it is very light to use with no overhead. Uv_per_10min Each input data generates one output data, so the storage pressure is high. Elasticsearch can be aggregated in minutes at uv_per_10min, so only one point will be stored in Elasticsearch every 10 minutes, making the visual rendering of Elasticsearch and Kibana much less difficult.

INSERT INTO cumulative_uv
SELECT time_str, MAX(uv)
FROM uv_per_10min
GROUP BY time_str;Copy the code


After submitting the above query, create the index pattern of Cumulative_uv in Kibana, then create a “Line” Line chart in Dashboard, select the index of Cumulative_uv, Draw the cumulative independent user number curve as shown in the following screenshot (left) and save the curve.

Top category rankings

The last interesting visualization is the category leaderboard to see which categories are the pillar categories. However, because the category classification in the source data is too detailed (about 5000 categories), it is not meaningful for the ranking, so we hope to reduce it to the top category. Therefore, the author preprepared the mapping data of subcategories and top classes in the mysql container to be used as dimension table.

Create a MySQL table on the SQL CLI for subsequent dimension table queries.

CREATE TABLE category_DIM (sub_category_id BIGINT, -- subcategory parent_category_id BIGINT -- top category) WITH ('connector.type' = 'jdbc'.'connector.url' = 'jdbc:mysql://localhost:3306/flink'.'connector.table' = 'category'.'connector.driver' = 'com.mysql.jdbc.Driver'.'connector.username' = 'root'.'connector.password' = '123456'.'connector.lookup.cache.max-rows' = '5000'.'connector.lookup.cache.ttl' = '10min'
);Copy the code


Create a new Elasticsearch table to store category statistics.

CREATE TABLE top_category (category_name STRING, -- categoryname buy_cnT BIGINT -- sales) WITH ('connector.type' = 'elasticsearch'.'connector.version' = '6'.'connector.hosts' = 'http://localhost:9200'.'connector.index' = 'top_category'.'connector.document-type' = 'user_behavior'.'format.type' = 'json'.'update-mode' = 'upsert'
);Copy the code


In the first step, we complete the category name through dimension table association. We still use CREATE VIEW to register the query as a VIEW, simplifying the logic. Dimension tables associated using temporal join syntax, can view the documentation for more: ci.apache.org/projects/fl…

CREATE VIEW rich_user_behavior AS
SELECT U.user_id, U.item_id, U.behavior, 
  CASE C.parent_category_id
    WHEN 1 THEN 'Clothing, shoes and bags'
    WHEN 2 THEN 'Home Decoration'
    WHEN 3 THEN 'goods'
    WHEN 4 THEN 'beauty'
    WHEN 5 THEN 'mother'
    WHEN 6 THEN '3 c digital'
    WHEN 7 THEN 'Sports outdoors'
    WHEN 8 THEN 'food'
    ELSE 'other'
  END AS category_name
FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF U.proctime AS C
ON U.category_id = C.sub_category_id;Copy the code


At last, count the number of buy events by category name and write them into Elasticsearch.

INSERT INTO top_category
SELECT category_name, COUNT(*) buy_cnt
FROM rich_user_behavior
WHERE behavior = 'buy'
GROUP BY category_name;Copy the code


After submitting the above query, create a Top_category index pattern in Kibana, then create a “Horizontal Bar” Bar chart in Dashboard, select the top_category index, Draw the category leaderboard as shown in the following screenshot (left) and save it.

It can be seen that the turnover of clothing shoes and bags is far ahead of other categories.

So far, we have completed three field cases and their visualizations. Now you can go back to the Dashboard page and drag and drop the views to make our Dashboard look more formal and intuitive (see the renderings at the beginning of this article). Of course, Kibana also provides very rich graphics and visualization options, and there is a lot of interesting information to be found in the user behavior data. Interested readers can use Flink SQL to analyze the data in more dimensions, and use Kibana to display more visualizations, and observe the real-time changes of the graph data.

At the end

In this article, we have shown how to use Flink SQL to integrate Kafka, MySQL, Elasticsearch, and Kibana to quickly build a real-time analysis application. The whole process is done without a single line of Java/Scala code, using PLAIN SQL text. Flink SQL is easy to use and powerful, including easy connection to various external systems, native support for event timing and out-of-order data processing, dimension table association, rich built-in functions, and more. Hope you will enjoy our practice and enjoy the fun and knowledge!


Read more: https://yqh.aliyun.com/detail/6404?utm_content=g_1000105579

On the cloud to see yunqi: more cloud information, on the cloud case, best practices, product introduction, visit: https://yqh.aliyun.com/