In the last article, we combed through the Flink Table Api/SQL in the Flink ecosystem, and how the Flink Table Api/SQL is converted step by step to run Flink programs.

After reading, you can initially understand the execution process of the program, there is an impression that more in-depth implementation details will be slowly peeled off after the source code analysis, in-depth explanation.

This time, we start from 0 step by step analysis of Flink SQL context and core concepts, and with a complete example procedures, I hope to help you!

This article outline

Quick experience with Flink SQL

To quickly set up an environment to experience Flink SQL, we used Docker to install some basic components, including Zk and Kafka. If you have this environment, you can skip it.

To install a Docker environment on Centos 7, see this link, which is not detailed here: blog.csdn.net/qq_24434251…

1. Select and install the ZooKeeper image

docker pull debezium/zookeeper
docker run -d -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper
Copy the code

2, pull install and execute kafka image

docker pull debezium/kafka docker run -d -it --rm --name kafka -p 9092:9092 -e KAFKA_ADVERTISED_LISTENERS = PLAINTEXT: / / 192.168.56.10-9092 - e KAFKA_LISTENERS = PLAINTEXT: / / 0.0.0.0:9092 - the link zookeeper:zookeeper debezium/kafkaCopy the code

3. Go to the kafka container command line

docker exec -it kafka /bin/bash
Copy the code

4. Create a topic

Sh --create --zookeeper 192.168.56.10:2181 --topic user_log --partitions 1 --replication-factor 1Copy the code

5. Start the program in IDEA

Here does not post the code is too long, specific procedures can see my github: github.com/nicekk/Flin…

6. Write data

/kafka/bin/kafka-console-producer.sh --broker-list  192.168.56.10:9092 --topic user_log

Copy the code

Example data:

{"user_id":123,"item_id":345,"ts":"2021-01-05 23:04:00"}
{"user_id":345,"item_id":345,"ts":"2021-01-05 23:04:00"}
Copy the code

7. Result Output:

Data type system

Before continuing to explain the use of Flink SQL, we need to talk about Flink’s data type system.

As a high-performance computing framework, Flink is bound to solve the problems of distributed computing, data transmission and persistence.

In the process of data transmission, data should be serialized and deserialized: serialization is to convert a memory object into a binary string, forming a network transmission or persistent data stream; Deserialization converts a binary string into an in-memory object that can be read and written directly in a programming language.

Flink runs on the JVM, and a large amount of data is stored in memory during computation, which causes problems such as low Density of Java object storage.

To solve these problems, the most common solution is to implement a display of memory management, using a custom memory pool to allocate and recycle memory, and then storing serialized objects in memory blocks.

Therefore, the more accurate Flink can infer the data type, the earlier it can complete the data type check, which helps Flink to better plan memory and save storage space.

For example, the following class, Tuple3 <Integer, Double, Person>, contains three data types.

Where Person contains two fields, id and name.

Int = IntSerializer (int); int = IntSerializer (int); int = IntSerializer Objects can be stored tightly together, unlike Java serialization, which has more storage waste.

(Data type systems are a very large area of Flink, and we’ll explain them in a separate article, but just to illustrate the importance of data types.)

How to execute SQL on unbounded data stream

Execute SQL on bounded data sets, I believe you have experienced every day, every day. Bounded datasets are static; in offline mode, SQL can access the entire dataset, and the query terminates after it produces results.

The data flow is infinite, meaning that the program has to run all the time, waiting for data to come in and be processed. How does this relate to SQL?

We will introduce two concepts here: Dynamic tables and Continuous Queries.

(1) Dynamic table

If you want to analyze a data flow using SQL, the first thing you do is change the flow to a table.

In the figure below, on the left is a clicked event stream with the name, event time, and clicked URL information. On the right is a table that also has these three fields.

The flow from the left to the table on the right is a logical mapping process that does not persist the data.

As the events on the left stream continue to arrive, the records on the right table are constantly updated.

A table that changes all the time is called a dynamic table.

(2) Continuous query

A query on a dynamic table is called a continuous query.

Applying the following SQL to a dynamic table, as shown in the figure below, produces a continuous query because the query never terminates and is generated once for each event.

As a result of the query, a new dynamic table is generated.

select 
  user.count(url) as cnt
  from clicks
 group by user;
Copy the code

[Mary,./home]

(Bob,./ CART) this data will be appended to the dynamic table, the final result is: [Mary, 1] [Bob, 1]

(Mary,. / prod? Id =1) select * from Mary where id= 2 and id=1;

(Liz,./home) This data will be appended to the dynamic table, the end result is: [Mary, 2] [Bob, 1] [Liz, 1]

In this way, we can use SQL to continuously query the dynamic table and generate new dynamic tables. (In fact, as we saw in the previous article, SQL eventually becomes program execution).

(3) Query restrictions

Since streams are infinite, we have to ask the question: can all queries be executed on streams?

The answer is no, mainly because of two reasons: first, the state of maintenance is relatively large, and second, the cost of computing update is high.

Since continuous queries run all the time, all the output rows need to be maintained in order to update the previously produced results, so that the data stored in memory becomes larger and larger.

However, sometimes, even if only one record comes in, most of the previous result rows need to be recalculated and updated, and such queries are not suitable for continuous queries.

SQL > select * from ‘ranking’; select * from ‘ranking’;

SELECT user.RANK(a)OVER (ORDER BY lastLogin)
  FROM ( 
          SELECT user.MAX(cTime) AS lastAction 
            FROM clicks GROUP BY user
  );
Copy the code

(4) Result output

Finally, Flink is a computing engine that does not store data itself, so how does it represent updating data and updating it to external storage? Here are two examples

1. The target table is the console

We can go back to the example above, where, since the target is the console, the results can be printed arbitrarily.

Source table, connect to Kafka, consume from the latest place
CREATE TABLE user_log (
  user_id bigint,
  item_id bigint,
  ts TIMESTAMP
) WITH (
  'connector' = 'kafka'.'topic' = 'user_log'.'scan.startup.mode' = 'latest-offset'.'properties.bootstrap.servers' = '192.168.56.10:9092'.'format' = 'json'.'json.fail-on-missing-field' = 'false'
)

-- The target table is a console, printed directly
CREATE TABLE user_log_result(
  user_id bigint,
  cnt bigint
) WITH (
  'connector' = 'print'
)

SQL query, a simple group by, count the source table user_id number, write to the target table
insert into user_log_result select user_id,count(1) cnt from user_log group by user_id
Copy the code

{“user_id”:123,”item_id”:345,”ts”:”2021-01-05 23:04:00″}

3 > + I (123, 1)

{“user_id”:123,”item_id”:123,”ts”:”2021-01-05 23:04:00″}

3 > -u (123, 1)

3 > + U (123, 2)

+I, -u, +U indicates the changelog of a row of data. +I indicates new data. -u indicates that the previous record has been updated and the previous record needs to be rolled back.

As you can see, the output is in the form of a Changelog generated for each row.

If the sink phase uses the DataStream Api, you can turn the dynamic table into a stream and continue to sink to downstream nodes. If SQL is used, it can be sent directly downstream.

See the specific procedures:

The target table is Kafka

Source table, connect to Kafka, consume from the latest place
CREATE TABLE user_log (
  user_id bigint,
  item_id bigint,
  ts TIMESTAMP
) WITH (
  'connector' = 'kafka'.'topic' = 'user_log'.'scan.startup.mode' = 'latest-offset'.'properties.bootstrap.servers' = '192.168.56.10:9092'.'format' = 'json'.'json.fail-on-missing-field' = 'false'
)

The target table is Kafka
CREATE TABLE user_log_result (
  user_id bigint,
  cnt bigint
) WITH (
  'connector' = 'kafka'.'topic' = 'user_log_result'.'scan.startup.mode' = 'latest-offset'.'properties.bootstrap.servers' = '192.168.56.10:9092'.'format' = 'json'
)

SQL query, a simple group by, count the source table user_id number, write to the target table
insert into user_log_result select user_id,count(1) cnt from user_log group by user_id
Copy the code

If you run it again, an error message will be displayed as follows:

Exception in thread "main" org.apache.flink.table.api.TableException: Table sink 'default_catalog.default_database.user_log_result' doesn't support consuming update changes which is produced  by node GroupAggregate(groupBy=[user_id], select=[user_id, COUNT(*) AS cnt])Copy the code

The target table user_log_result (kafka) does not support the updated data. Kafka can only support new data all the time.

If we change to the following SQL, the data will only be added without updating, and can be run. Of course, you can also change the target table to another media that can be updated, such as mysql, hbase, etc.

insert into user_log_result select user_id,count(1) cnt from user_log group by user_id
Copy the code

See the specific procedures:

4. Time, INTERVAL and window calculation

Window computing is always the core of stream computing. A window splits an infinite stream into a finite data set, which can be computed.

When talking about Windows, there are always many concepts, such as: event time, processing time, window start time, window end time, sliding window, scroll window, window size, watermark…….

With the latest Flink SQL, it’s now possible to define all of these in DDL, so let’s take them apart.

1. INTERVAL

Interval is not unique to Flink SQL, it is available in ANSI SQL, let’s use Oracle as an example.

First of all, you need to have an Oracle environment, here we use Docker to build, see this link for detailed tutorial:

Blog.csdn.net/qq_24434251…

INTERVAL indicates a time difference. Create a table to experience it

create table INTERVAL_TAB
(
    DURATION INTERVAL DAY (2) TO SECOND (5))Copy the code

To create a table, the field duration indicates days to seconds, and the number in parentheses indicates precision.

insert into interval_tab (duration) values (interval '3" day(3) to minute );
Copy the code

The inserted data represents a period of time: 3 days, 12 hours and 32 minutes

This might not seem very useful, so if I ask you how many years you’ve been with the company, you can easily say, but if I ask you how many days you’ve been with the company, it’s a little bit more complicated, leap years in between, February in between, it’s a little bit more convenient.

For example, you can easily calculate the date 100 days before today:

select sysdate,sysdate - interval '100' day(3)  as"Current time -100 days"from dual;
Copy the code

With INTERVAL, we can easily express the length of the window.

2. Window calculation

Scroll window – use ProcessingTime

-- Source table, user_name user name, data data
CREATE TABLE user_actions (
	user_name string,
	data string,
	user_action_time as PROCTIME()
   ) WITH (
	'connector' = 'kafka'.'topic' = 'user_log'.'scan.startup.mode' = 'latest-offset'.'properties.bootstrap.servers' = '192.168.56.10:9092'.'format' = 'json'.'json.fail-on-missing-field' = 'false'
)

- result table
CREATE TABLE user_action_result(
  window_start TIMESTAMP(3),
  cnt bigint
) WITH (
  'connector' = 'print'
)
				
-- Window calculation
INSERT INTO user_action_result
select * from (
	SELECT TUMBLE_START(user_action_time, INTERVAL '10' SECOND) window_start, COUNT(DISTINCT user_name) cnt
	FROM user_actions
	GROUP BY TUMBLE(user_action_time, INTERVAL '10' SECOND))-- Test data
{"user_name":"zhangsan","data":"browse"}
{"user_name":"lisi","data":"browse"}
Copy the code

First of all, on the source table, we used processing time and loaded the field user_action_time, which was not a field in our data, but was automatically added by the program as a virtual field as the time attribute.

Then query SQL, group by TUMBLE(user_action_time, INTERVAL ’10’ SECOND), which means it’s a scrolling window, uses user_action_time as the time field, And the window size is INTERVAL ’10’ SECOND, which is 10 seconds, which is the syntax of the INTERVAL we just talked about.

Select TUMBLE_START(user_action_time, INTERVAL ’10’ SECOND) as the window start time, COUNT(DISTINCT user_name) Indicates the COUNT of user_name deduplication in each window.

See the specific procedures:

Scroll window – Use EventTime

First, you still need to declare the use of EventTime in the execution environment:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

Modify the definition of the source table

CREATE TABLE user_actions (
	user_name string,
	data string,
	user_action_time TIMESTAMP(3),
	WATERMARK FOR user_action_time as user_action_time - INTERVAL '5' SECOND
   ) WITH (
	'connector' = 'kafka'.'topic' = 'user_log'.'scan.startup.mode' = 'latest-offset'.'properties.bootstrap.servers' = '192.168.56.10:9092'.'format' = 'json'.'json.fail-on-missing-field' = 'false'
)
Copy the code

WATERMARK FOR user_action_time as user_action_time-interval ‘5’ SECOND, To represent user_action_time as the time field and declare a 5s delay to watermark. The event_time and water level are defined in a single SQL sentence.

The specific program can be downloaded from my Github:

Github.com/nicekk/Flin…

Five, the summary

If you have not touched Flink SQL before, after reading this article, I believe you have a preliminary understanding of Flink SQL, and then open IDEA, hands-on operation again will have a deeper understanding, which has also reached the purpose of this article.