Abstract: Flink into the hole guide series of articles, starting with practical examples, step by step guide users to zero basic real-time computing /Flink, and grow into the use of Flink advanced users. This article is a personal original, only for technical exchanges, the author of shallow talent, if there is a mistake, welcome to correct. Reprint please indicate the source, infringement will be corrected.

Flink interface

Flink supports three interfaces, that is, three ways of writing jobs:

  • SQL: Real-time computing products provide more comprehensive syntax support than open source Flink, and 95% of users currently use SQL to solve their problems
  • TableAPI: express the same as SQL
  • This series of articles will start with streaming SQL. The focus is on helping users understand the difference between batch and streaming SQL so that they can quickly learn to write the correct streaming SQL on Flink.

If you are interested in learning more about FlinkSQL implementation principles or studying the Flink code, you can refer to Apache Flink Series – SQL Overview.

There is a problem? Let me ask

Clear requirements

Following the content of the previous chapter, this chapter calculates an index:

  1. From 0 o ‘clock on the day, the trading volume of the whole network will be used as follows:
  • Upstream SOURCE: Kafka
  • Downstream (SINK) : MySQL

Start your first assignment

The original data

ctime category_id shop_id item_id price
The 2018-12-04 15:44:54 cat_01 shop_01 item_01 10
The 2018-12-04 15:45:46 cat_02 shop_02 item_02 11.1
The 2018-12-04 15:46:11 cat_01 shop_03 item_03 12.4

The main logic

The source table:

-- SELECT category_id from CATEGORY_ID AND category_id FROM 'ctime' select category_id from 'ctime' and 'ctime' select category_id from 'ctime' and 'ctime' select category_id from 'ctime' and 'ctime' DDL create table sink(cdate date) DDL create table sink(cdate date, -- date GMv_daily double -- daily transaction amount from 0 PM)Copy the code

Batch SQL writing

General batch writing:

SELECT date_format(ctime, '%Y%m%d') as cdate, 2018-12-04 15:44:54, Convert to date_format(20181204) SUM(price) AS gmv_daily FROM SRC GROUP BY date_format(ctime, '%Y%m%d'); -- Do aggregation by dayCopy the code

Results:

cdate gmv_daily
20181204 33.5

Features:

  • Before each execution, all data stored on the current day is saved in the database
  • Each time you execute SQL, you get a __ return value, and the SQL execution ends

Flink writing SQL

SELECT date_format(ctime, '%Y%m%d') as cdate, 2018-12-04 15:44:54, Convert to date_format(20181204) SUM(price) AS gmv_daily FROM SRC GROUP BY date_format(ctime, '%Y%m%d'); -- Do aggregation by dayCopy the code

Features:

  • Flink SQL is a resident process, a SQL file, corresponding to a Flink job. If the user does not kill the job, the SQL persists
  • The Flink job outputs a value for each entry of data. MySQL > alter table select * from primary key;
cdate gmv_daily
20181204 10.0
20181204 21.1
20181204 33.5

Flink: Flink: Flink: Flink: Flink: Flink: Flink: Flink: Flink: Flink: Flink: Flink: Flink: Flink: Flink: Flink:

cdate gmv_daily
20181204 33.5

The principle is introduced

In this example, the BATCH and stream SQL are the same, and the end result looks the same. But batch engines (like MySQL/Hive, etc.) execute in a completely different way than stream engines (like Flink). This results in many differences in how the same SQL handles data. If you want to use Flink SQL in depth, and ensure the correctness of the results, to become a Flink SQL tuning expert, you need to have some understanding of the underlying implementation of Flink. The examples in each chapter will be followed by an introduction to the underlying principles used in this chapter. But will not talk about the implementation details, need to understand the implementation details of the students, you can follow Flink source code.

1. Start with the behavior of batch and stream SQL – continuous query

Intuitively, batch and stream SQL behave very differently:

  • Each time batch SQL is executed, the result is returned only once. The result is calculated based on the snapshot of the data in the database at the calculation time
  • Stream SQL executes as long as it is started, producing results as long as there is data available. A very important and fundamental new concept in stream computing is continuous query. Simple to understand, the characteristics of continuous query, Flink operation will always execute its SQL logic, every new data, will trigger downstream calculation, so that continuous output.

There are two key operations in this SQL:

  • Group By: A Group operation. In batch SQL and stream SQL, the behavior of Group By is the same. Both groups data By certain fields.
  • In batch SQL and stream SQL, the SUM operation is semantically the same. The SUM operation is performed on a field in each group. But batch SQL and stream SQL are implemented differently:

    • Batch: Now that all data is known, add the sum fields for each group.
    • Stream: Data comes into the system one by one, without knowing all the data, one by one to produce a result.

2. Group by + SUM () – state

In the process of continuous calculation of Flink SQL, data flow continuously. Take the example in this paper as an example, three data are successively entered into Flink. In Flink, a global group BY is required according to Cdate, and then a SUM operation (SUM) is performed on the price of all data in each Cdate.

  1. item_01: sum1=0+10
  2. Item_02: sum2 = sum1 + 11.1 = 21.1
  3. Item_03: sum3 = sum2 + 12.3 = 33.4

This leads to a problem: the SUM calculation of each data item depends on the calculation result of the previous data item. Does Flink keep these intermediate results when calculating? The answer is: yes. These intermediate results are part of the state of a job.

A few key questions about state:

  • Are all jobs stateful? __ is, but intermediate results are stored in state only for aggregate operations such as /JOIN. Simple operations, such as filter, do not need to store intermediate results in state. Apache Flink Doc, Flink Committer Committer, Apache Flink Committer Committer
  • How long does state keep? Will state be kept forever? Don’t. States in stream calculations have an expiration date. In real-time computing products, the default is 36 hours.
  • What does __state expire mean? __state expires, which means that the state from 36 hours ago is deleted. This is to save system storage space. During large-window JOIN calculation, a lot of data needs to be saved. If all data is saved, the cluster disk will be full.
  • What is the __state expiration rule? __state Expiration rule

    • The states of different groups by are unrelated to each other
    • The state of the group by group will be cleared if the state of the group by group was last updated if the state of the group by group was last updated if the state of the group by group was last updated when the state of the group by group was last updated.
  • Can the state expiration time be adjusted? , real-time computing products, single homework can configure this parameter: the state. The backend. Rocksdb. TTL. Ms = 129600000, unit of milliseconds.
  • To take this example to the extreme, suppose we set the state expiration parameter to 5 minutes. If the time difference of 3 original data entering Flink is more than 5 minutes __, the time of data entering Flink is defined as ptime, as shown below:
ctime category_id shop_id item_id price ptime
The 2018-12-04 15:44:54 cat_01 shop_01 item_01 10 The 2018-12-04 15:45:00
The 2018-12-04 15:45:46 cat_02 shop_02 item_02 11.1 The 2018-12-04 15:45:10
The 2018-12-04 15:46:11 cat_01 shop_03 item_03 12.4 The 2018-12-04 15:52:00

At this time:

  1. item_01(ptime=2018-12-04 15:45:00): sum1=0+10
  2. Item_02 (ptime = __2018-12-04 15:45:10 __) : sum2 = sum1 + 11.1 = 21.1
  3. item_03(ptime=__2018-12-04 15:52:00__): Sum3 =0+12.3=12.3 There is more than 5 minutes between Item_02 and Item_03, so the value of SUM2 in the state is cleared. As a result, when Item_03 arrives, the sum3 value is calculated incorrectly.

In this example:

  • Ctime is the time at which the data is generated, and the term in stream computing is event time.
  • Ptime is the event of data entering Flink, and the term is process time in stream computing. These two time domains are the basic concepts of stream computing. To use stream computing correctly, you also need to understand these two concepts, related articles:

“The What-What-When – and How of Data Processing” in Streaming System chapter 1: Streaming 101

SQL optimization

In the above SQL, each piece of data has to be calculated once. In the case of a large number of inputs, performance bottlenecks tend to occur. For each piece of data, the backend reads and writes state, serializes and deserializes, and even I/O to the disk. For complex scenarios, such as JOIN/TopN, state-related operations often become performance bottlenecks for the entire task. How to avoid this problem? Use the Microbatch policy. Microbatch, as the name implies, is a collection of batches. Not one at a time, but a batch. Related configurations are as follows:

This configuration is required when using the microbatch policy. And Suggestions and blink. MiniBatch. Consistent allowLatencyMs blink. MicroBatch. AllowLatencyMs = 5000 # using microBatch need to keep the following two miniBatch configuration Blink. MiniBatch. AllowLatencyMs = 5000 # prevent OOM, blink. The article how much per batch cache data. MiniBatch size = 20000.Copy the code

Related knowledge points

  • Continuous queries

    • Apache Flink Continuous Queries
  • State (state) :

    • Apache Flink Doc
    • Please refer to Apache Flink’s Committer Committer series – State.
  • Time domain concepts in stream computing – Process Time vs. Event Time

    • Streaming System chapter 1: Streaming 101
    • Streaming System chapter 2: The What-What-When – and How of Data Processing

There is a problem? Let me ask