Thank you for your little love (attention + like + look again), the affirmation of the blogger, will urge the blogger to continue to output more quality actual combat content!!

1. Preface – Structure of this paper

Big data sheep said

Use data to increase the probability of good things happening

34 original content

The public,

Flink SQL Tumble Window is a fantastic way to resolve it.

The datastream API is familiar to most of you. Once again, in datastream, the logic of the code you write is how it will ultimately be executed.

However, we are not familiar with the execution process of Flink SQL. In the previous section, we used the ETL, group AGG (sum, count, etc.) simple aggregation class Query to take you into the world of Flink SQL Query logic. To at least familiarize you with what flink SQL is doing when it’s running.

This section is the first of a series of window aggregation chapters that introduces you to the simplest and most commonly used Example of Minute Tumble Window aggregation and how it works.

Since Flink 1.13 introduced Window TVF, 1.13 is implemented differently from 1.12 and previous versions. This section starts with flink 1.12 and the Tumble Window implementation before it. This is the one that people use most often when introducing flink SQL capabilities.

Again, this section provides a detailed overview of the capabilities of Flink SQL from the following sections.

  1. Objective – What can this article help you understand about Flink SQL?
  • Review the conclusions of the Flink SQL application scenario from the previous section
  1. Concepts – Let’s talk about common window aggregations
  • Windows actually slow down data output?

  • Common Windows

  1. Combat – A simple Tumble Window example and how it works
  • Take a look at an example datastream window

  • Semantics of Flink SQL Tumble Window

  • Tumble Window practical case

  • GeneratedWatermarkGenerator – flink 1.12.1

  • BinaryRowDataKeySelector – flink 1.12.1

  • AggregateWindowOperator – flink 1.12.1

  1. Summary and outlook

Let’s start with the conclusions. The following conclusions have already been covered in the previous section, with the following article attached:

  1. Scenario issues: Flink SQL is great for simple ETL, as well as aggregation-like indicators for almost all scenarios (tumble Window in this section falls within the category of aggregation-like indicators).

  2. Syntax problem: Flink SQL syntax is basically the same as other SQL syntax. There are few syntax problems that hinder the use of Flink SQL. But the tumble Window syntax in this section is a slightly different one. Details below.

  3. Running Problems: Some tips for viewing Flink SQL tasks, and some of the potholes you might encounter:

  • Go to the Flink webui to see what the task is currently doing. Including the operator name will directly show us which operator is doing what, what logic is being handled.

  • The SQL watermark type should be set to TIMESTAMP(3).

  • In event time logic, the SQL API and datastream API store the data record timestamp logic differently. Datastream API: The RowTime for each record is placed in the timestamp field in the StreamRecord. SQL API: Time stamps are retrieved from the data each time. There will be a subscript maintained in the operator. Timestamps can be obtained from the data by subscript.

2. Objective – What does this article help you understand about Flink SQL Tumble Window?

There are some common questions about Flink SQL Tumble Window. The goal of this article is to answer these questions:

  1. Scenario issues: Needless to say, datastream is widely used in tumble Window scenarios, common scenarios such as minute level aggregation

  2. Syntax issues: Flink SQL writes Tumble Window tasks with a syntax not available in Hive SQL. Details below.

  3. Runtime Issues: Use a simple Tumble Window SQL to help you understand the overall operation mechanism of Tumble Window from Transformation to Runtime.

  4. Misconception: Since SQL is bound to follow SQL semantics, SQL Tumble Window aggregation is about entering multiple entries and producing one data entry. Unlike datastream, you can do many-to-many in a window UDF.

Before we start talking about Tumble Window in earnest, let’s take a look at the conclusions from the previous section about where Flink SQL applies. Let us first have a general impression of Flink SQL and the conclusion.

2.1. Review the conclusions of the Flink SQL application scenarios in the previous section

No, I confess, flink SQL is really good for DWD cleaning, DWS aggregation.

This is mainly for the real-time warehouse scenario. Flink SQL can do DWD cleaning, DWS aggregation, basically real-time data warehouse most scenarios can be covered.

Flink SQL is awesome!!

But!!

My experience with Flink SQL shows that not all DWD and DWS aggregation scenarios are suitable for Flink SQL (as of the post stage)!!

In fact, these scenarios that are not currently suitable for Flink SQL can be summed up as a loss of processing over datastream.

First, summarize the usage scenarios:

1. DWD: simple cleaning, complex cleaning, dimension expansion, and use of various UDFs

2. DWS: Various types of aggregation

Then, it is divided into suitable scenes and unsuitable scenes. Since this article cannot cover all the content, this paper first gives a general conclusion here, and then describes the specific scenes in detail.

  • Suitable scene:
  1. Simple DWD cleaning scenario

  2. DWS aggregation scenario for all scenarios

  • Scenarios that are not currently suitable:
  1. Complex DWD cleaning scenarios: Examples include using a lot of UDF cleaning, especially using a lot of JSON class parsing cleaning

  2. Associative dimension scenario: For example, datastream often stores a batch of data to access external interfaces in batches. Although Flink SQL has localcache and asynchronous access capabilities for this scenario, it still accesses external caches one by one, resulting in performance gaps compared with batch access.

3. Concepts – Let’s talk about common window aggregations

Window aggregation is familiar from the datastream API, and window computation is arguably the most important and commonly used method for real-time data processing.

But before throwing out the window concept, the blogger has a few small thoughts about Windows.

3.1. Windows actually slow down data output?

A little thought.

Conclusion: Windows slow down the output of real-time data and are a compromise with the limited capacity of downstream analysis engines.

In the world of data development and demand, of course, you want all data to come in real time, to be processed in real time, to be produced in real time, to be presented in real time.

For example, if you want to meet pv, UV, or other aggregation requirements for a one-minute window aggregation.

Olap data service engine can meet the above real-time arrival, real-time processing, real-time output, real-time display of the scene. Flink consumes the detailed data, outputs it to Kafka, and imports it directly into the OLAP engine. Query directly using OLAP for aggregation. There is no concept of a window. However, in the whole link, it is necessary to ensure end-to-end accuracy at a time, to ensure that the OLAP engine can return second-level queries in the case of large data volume, not to mention some de-class index calculation, and so on. The pressure to put all this pressure on the OLAP engine is enormous.

Thus the concept of Windows was born in the Flink data computing engine. We can perform window aggregation calculations directly in the computing engine, and then wait until the end of the window directly output the result data. This leads to what bloggers call Windows slowing down the output of real-time data. And Windows can cause data loss if they are not handled properly.

The pros and cons of the above two cases are up to you to choose by yourself. The above are just some ideas for the bloggers.

3.2. Commonly used Windows

Currently known Windows are divided into the following four types.

1. Tumble Windows2. Hop Windows3. Cumulate Windows****4. Session Windows

Detailed descriptions of these Windows can be found directly on the official website with detailed instructions. I will not repeat it here.

Nightlies.apache.org/flink/flink…

Here are two confusing concepts commonly involved in Flink: window + key. Here’s a visual illustration.

  • Window: The division above the time period. The infinite stream is longitudinally segmented into Windows one by one, which is equivalent to the data within a period of time in the infinite stream.

  • Key: Division of data categories. If the infinite stream is horizontally divided, the data of the same key will be divided into a group, and the data of this key is also an infinite stream.

As shown in the figure below.

1

4. Combat – A simple Tumble Window case and how it works

Flink SQL Tumble Window is a fantastic way to resolve it.

4.1. Take a look ata datastream window example

Before introducing the SQL Tumble Window operator execution example, take a look ata window operator example in datastream. The logic is the same. This will help us understand the SQL Tumble Window operator.

Let’s look at the datastream processing logic first.

Take this one for example.

public class _04_TumbleWindowTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment  env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.addSource(new UserDefinedSource()) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple4<String, String, Integer, Long>>(Time.seconds(0)) { @Override public long extractTimestamp(Tuple4<String, String, Integer, Long> element) { return element.f3; } }) .keyBy(new KeySelector<Tuple4<String, String, Integer, Long>, String>() { @Override public String getKey(Tuple4<String, String, Integer, Long> row) throws Exception { return row.f0; } }) .window(TumblingEventTimeWindows.of(Time.seconds(10))) .sum(2) .print(); Env.execute ("1.12.1 DataStream TUMBLE WINDOW Case "); } private static class UserDefinedSource implements SourceFunction<Tuple4<String, String, Integer, Long>> { private volatile boolean isCancel; @Override public void run(SourceContext<Tuple4<String, String, Integer, Long>> sourceContext) throws Exception { while (! this.isCancel) { sourceContext.collect(Tuple4.of("a", "b", 1, System.currentTimeMillis())); Thread.sleep(10L); } } @Override public void cancel() { this.isCancel = true; }}}Copy the code

Datastream’s specific transformations are shown below:

24

We will focus only on the most important WindowOperator.

25

The important properties of WindowOperator are shown in the following figure.

26

Take a look at the WindowOperator execution logic. The whole detailed process of window execution can be referred to: wuchong.me/blog/2016/0…

23

4.2. Semantics of Flink SQL Tumble Window

When it comes to Tumble Window semantics, there should always be a comparison. The reference point here is the datastream API.

In the datastream API. Tumble Window is generally used in one of two scenarios.

  1. Business scenario: It’s easy to calculate aggregate data within a Window using Tumble Window. Generally, there are multiple inputs and one output at the end of the window.

  2. Optimization scenario: Windows aggregate a batch of data and then batch access external storage to expand dimensions, or have some custom processing logic. Generally, there are multiple inputs and multiple outputs at the end of the window.

But in the SQL API. Tumble Window is a group by semantics. Aggregated data processing logic in the SQL standard is the semantics of multiple inputs that output one piece of data when a window is triggered. The above optimization scenario commonly used in datastream is a many-to-many scenario. Therefore, SQL semantics do not conform. So flink SQL Tumble Window is generally used to evaluate aggregate operations.

4.3. Tumble Window Actual case

The feature of a scrolling window is that it divides an infinite stream vertically into Windows of the same size and does not overlap.

22

This article focuses on the implementation of Flink 1.12 and previous versions. The next article introduces the implementation of Flink 1.13.

Now, before we introduce the principle, we always want to use it first, so let’s start with the following example.

1. (Flink 1.12.1) Scenario: Simple and common online users and total sales by minute

Data source table:

CREATE TABLE source_table (-- dim STRING, -- user ID user_id BIGINT, -- user price BIGINT, Row_time AS cast(CURRENT_TIMESTAMP AS timestamp(3)), -- watermark set watermark FOR row_time AS row_time-interval '5' SECOND) WITH ('connector' = 'datagen', 'rows-per-second' = '10', 'fields.dim.length' = '1', 'fields.user_id.min' = '1', 'fields.user_id.max' = '100000', 'fields.price.min' = '1', 'fields.price.max' = '100000' )Copy the code

** SQL’s watermark type must be set to TIMESTAMP(3).

Data pool table:

CREATE TABLE sink_table (
    dim STRING,
    pv BIGINT,
    sum_price BIGINT,
    max_price BIGINT,
    min_price BIGINT,
    uv BIGINT,
    window_start bigint
) WITH (
  'connector' = 'print'
)

Copy the code

Data processing logic:

It’s tumble(row_time, interval ‘1’ minute) that’s different from writing Hive SQL, mysql, etc.

insert into sink_table select dim, sum(bucket_pv) as pv, sum(bucket_sum_price) as sum_price, max(bucket_max_price) as max_price, min(bucket_min_price) as min_price, sum(bucket_uv) as uv, max(window_start) as window_start from ( select dim, count(*) as bucket_pv, sum(price) as bucket_sum_price, Max (price) as bucket_max_price, min(price) as bucket_min_price, count(distinct user_id) as bucket_uv, cast(tumble_start(row_time, Interval '1' minute) as bigint) * 1000 as window_start from source_table group by Prevent Data tilt mod(user_id, 1024), dim, tumble(row_time, interval '1' minute)) Group by dim, Window_startCopy the code

2. Run: As you can see, in fact, in the FLink SQL task, it will write the corresponding processing logic to the operator name.

1: ** This is actually our first tip for looking at the Flink SQL task. If you want to know what your Flink task is doing, your first reaction is to go to the Flink webui and see what the task is currently doing. Including the operator name will directly show us which operator is doing what, what logic is being handled

Let’s take a look at the whole operator diagram, as shown below. There are three operators going from left to right.

  1. The first operator is the data source operator

  2. The second operator is the window aggregation operator divided into buckets. The hash transmission between the first operator and the second operator is based on the group key

  3. The third operator is the outer operator for bucket combination calculation, which is also hash transmission, combining bucket data in one operator

5

Let’s see what each operator does.

The first operator:

  1. Table Scan reads data sources

  2. Get the corresponding fields from the data source (including rowTime defined by the source table)

  3. Allocate watermark (allocate corresponding watermark according to the watermark defined in the source table)

  4. Extract the necessary fields. Such as the field in Group by. This parameter is used for hash.

6

The second operator:

  1. Window aggregation, calculates window aggregation data

  2. The data is computed and formatted according to the data in the first layer SELECT

7

The third operator:

  1. Group Aggregate bucket calculation

  2. The data is computed and formatted against the data in the second layer SELECT

  3. Sink data out

8

3. (Flink 1.12.1)

+ I (9,1,32682,32682,32682,1,1631026440000), U (9,1,32682,32682,32682,1,1631026440000) + U (9,2,115351,82669,32682,2,1631026440000) + I (2,1,76148,76148,76148,1,1631026440000) + I + I (8,1,79321,79321,79321,1,1631026440000) (1857, 92857, 92857, 92 a,,1,1631026440000) + I (0,1,12858,12858,12858,1,1631026440000) + I (5,1,36753,36753,36753,1,1631026440000) + I (3,1,19218,19218,19218,1,1631026440000)...Copy the code

4. (Flink 1.12.1) Principle:

The mechanism for starting SQL is detailed in the previous section.

This section introduces only the new content from the previous section. You can see the transformation in the following figure.

9

4.4. GeneratedWatermarkGenerator – flink 1.12.1

In that order, look at the watermark operator first. Custom Watermark allocation policies for datastream.

10

WatermarkGenerator$6. The currentWatermark method is used to get the logic for watermark. The diagram below.

11

4.5. BinaryRowDataKeySelector – flink 1.12.1

Next comes group by (same as Keyby in datastream).

12

The specific code generated by Group by Key is KeyProjection$19, and the main logic is in the Apply method.

13

The next one is the window aggregation operator.

4.6. AggregateWindowOperator – flink 1.12.1

Guys!! Guys!! Guys!!

Now comes the highlight of this section. SQL window aggregation operator parsing up.

There is nothing more to say about WatermarkGenerator and KeyProjection, both input and output data, the logic is simple.

But the calculation logic of window aggregation operator is much more complicated than the above two operators. Window operator also carries the main logic of window aggregation, so this paper focuses on the logic of window operator calculation.

Let’s take a look at the whole PROCESS of SQL window. The process is basically the same as datastream, except that Evictor is missing. As shown in the figure below.

40

Then look at the above SQL generated window aggregation operator, AggregateWindowOperator. The properties in the screenshot are also very clear.

16

14

The window aggregation code generated by the specific GroupingWindowAggsHandler $59.

41

Calculation logic GroupingWindowAggsHandler $59 # the accumulate.

42

43

All the above sections are initialized in the Flink client. Including initialization of window operators.

The following processing logic is executed at the flink TM runtime, including initialization of window operator resources and running logic. It’s time for the formal data processing.

Window operator Task runs.

27

Initializes the window operator Task.

28

StreamTask Indicates the overall processing flow.

29

Initializes the window operator open.

30

31

The result of initializing the window operator open. The following figure shows the corresponding components.

32

Once the initialization is complete, the concrete data is processed.

Loop loop, run and run.

35

Determine the specific type of record and then execute the different logic.

36

Take a look at the processElement method logic that processes a piece of data for ACC processing. In the code before windowAggregator is code generation GroupingWindowAggsHandler $59.

The SQL API and datastream API are different in the event time logic for storing data record timestamps. Datastream API: The RowTime for each record is placed in the timestamp field in the StreamRecord. SQL API: Time stamps are retrieved from the data each time. There will be a subscript maintained in the operator. Timestamps can be obtained from the data by subscript.

39

Take a look at executing the onEventTime logic when watermark arrives and triggers a window calculation.

38

When the window calculation is triggered, onEventTime -> emitWindowResult produces the specific data.

17

The entire SQL Tumble Window processing logic is now clear. Datastream is basically the same as datastream. I think the whole logic is clear.

5. Summary and Outlook

Flink SQL Tumble Window is a fantastic way to resolve it.

Big data sheep said

Use data to increase the probability of good things happening

34 original content

The public,

This article focuses on common scenarios for the Tumble Window aggregation class metric and how it operates underneath.

It also introduces some tips for viewing flink SQL tasks:

  1. Go to the Flink webui to see what the task is currently doing. Including the operator name will directly show us which operator is doing what, what logic is being handled.

  2. The SQL watermark type should be set to TIMESTAMP(3).

  3. In event time logic, the SQL API and datastream API store the data record timestamp logic differently. Datastream API: The RowTime for each record is placed in the timestamp field in the StreamRecord. SQL API: Time stamps are retrieved from the data each time. There will be a subscript maintained in the operator. Timestamps can be obtained from the data by subscript.

Subsequent articles will introduce version 1.13 of Flink SQL Tumble Window (based on the latest Windows TVF) in terms of some of the most common cases and rationale.

I hope you will continue to pay attention. Support bloggers. If you like it, please follow + like + watch again.

Phase to recommend

[

Flink SQL knows why: You haven’t even seen ETL and Group AGG scenarios that are best suited for Flink SQL?

] (mp.weixin.qq.com/s?__biz=Mzk…).

[

Flink SQL know why (6) | flink SQL date calcite (see this article will be enough)

] (mp.weixin.qq.com/s?__biz=Mzk…).

[

Flink SQL know why (5) | custom protobuf format

] (mp.weixin.qq.com/s?__biz=Mzk…).

[

Flink SQL know why (4) | SQL API type system

] (mp.weixin.qq.com/s?__biz=Mzk…).

[

Flink SQL know why (3) | custom redis data pool table (with source)

] (mp.weixin.qq.com/s?__biz=Mzk…).

[

Flink SQL know why (2) | custom redis data dimension tables (with source)

] (mp.weixin.qq.com/s?__biz=Mzk…).

[

Flink SQL know how (a) | source/sink principle

] (mp.weixin.qq.com/s?__biz=Mzk…).

More Flink real-time big data analysis related technology blog posts, videos. The background replies flink or FLink SQL.

Click a like + see, thank you for your affirmation 👇Copy the code

This article uses the article synchronization assistant to synchronize