Brief introduction: “Real-time warehouse Entry Training Camp” by Ali Cloud researcher Wang Feng, Ali Cloud senior technical expert Jin Xiaojun, Ali Cloud senior product expert Liu Yiming and other real-time computing Flink version and Hologres technology/product front-line experts together to build the training camp curriculum system, carefully polished course content, Directly to the current students encountered pain point problems. From shallow to deep comprehensive analysis of real-time data warehouse architecture, scenarios, and practical application, 7 quality courses to help you grow from a small white into a big bull in 5 days!

This article collates the self-broadcast “real-time computing Flink version OF SQL practice – Li Lin (Seal)” video link :c.tb.cn/F3.0dBssY

Content brief: one, real-time calculation Flink version of SQL introduction two, real-time calculation Flink version of SQL hands-on example three, the development of common problems to solve the method

Real-time computing Flink version of SQL introduction

(a) on the real-time calculation of Flink version OF SQL

Real-time computing Flink version chose SQL as a declarative language as the top API, more stable, but also convenient for users to use. Flink SQL has the characteristics of stream batch unification, to give users a unified development experience, and semantic consistency. In addition, Flink SQL is capable of automatic optimization, including shielding the complexity of State in flow calculations, providing an auto-optimized Plan, and integrating AutoPilot auto-tuning capabilities. Flink SQL is also used in a wide range of scenarios, including data integration, real-time reporting, real-time risk control, and online machine learning.

(2) Basic operations

In terms of basic operations, you can see that the SYNTAX of SQL is very similar to standard SQL. Basic SELECT and FILTER operations are included in the example. , you can use built-in functions, such as date formatting, or you can use custom functions, such as the exchange rate conversion example, which is a user-defined function that can be used directly after registering with the platform.

(3) Dimensional table Lookup Join

In actual data processing, Lookup Join of dimension table is also a common example.

Shown here is an example of a dimension table INNER JOIN.

The SOURCE table shown in the example is a real-time changing order information table, which associates dimension table information through INNER JOIN. Here the syntax of dimension table JOIN is highlighted in yellow. It can be seen that it is different from traditional batch processing in writing. Add the FOR SYSTEM_TIME AS OF clause to indicate that it is a dimension table JOIN operation. Each time the SOURCE table receives an order message, it triggers the dimension table operator to query the dimension table information, so it is called a Lookup Join.

4. Window Aggregation

Window Aggregation is also a common operation, and Flink SQL has built-in support for several commonly used Window types, such as Tumble Window, Session Window, Hop Window, And the newly introduced Cumulate Window.

Tumble

Tumble Window can be understood as a fixed-size time Window, also called a Tumble Window, such as a fixed interval of 5 minutes, 10 minutes, or 1 hour, with no overlap.

Session

The Session Window defines a range of consecutive events. One of the parameters in the Window definition is called Session Gap, which means that if the interval between two pieces of data exceeds the defined length, the previous Window ends and a new Window is created.

Hop

A Hop Window is different from a scrolling Window. The Windows of a sliding Window can overlap. The slide window takes two parameters: size and slide. Size is the size of the window and slide is the step size for each slide. If slide < size, Windows overlap and the same piece of data may be assigned to multiple Windows; If slide = size, it is equivalent to Tumble Window. If slide > size, there is no overlap and gaps between Windows.

Cumulate

Cumulate Window Cumulate Window is a new Flink version 1.13. It will be Hop Window that starts to accumulate. Windows 1, 2, and 3 are growing in the example. It has a maximum Window length. For example, if we define Window Size as one day and Step as one hour, it will generate aggregate results for each hour of the day up to the current hour.

Take a look at a concrete example of Window aggregation handling.

As shown above, for example, you need to count clicks per user per 5 minutes.

The source data is the user click log, we expect to calculate the total number of clicks per user every 5 minutes, using the community’s latest WindowTVF syntax, first open the source table, then the GROUP BY window corresponding properties WINDOW_start and WINDOW_end, COUNT(*) is the COUNT of clicks.

It can be seen that when the data from 12:00 to 12:04 are processed, there are 2 users who have 4 clicks, respectively Mary has 3 clicks and Bob has 1 click. In the next batch of data, 3 more data came and were updated to the next window, 1 and 2 times respectively.

The Group Aggregation

Compared with Window Aggregation, Group Aggregation directly triggers calculation and does not need to wait until the end of the Window. One applicable scenario is the calculation of cumulative value.

The example above is the current count of clicks accumulated by a single user. COUNT(*) = COUNT(*); Query = COUNT(*);

It can be seen that the results are different from those of Window. If the first four inputs are the same as those of Window, the output result of Group Aggregation is that Mary’s number of hits has been updated to 3. The specific calculation process may be from 1 to 2 and then to 3. With the input of the next three pieces of data, the number of hits corresponding to Bob will be updated again to 2 times. The result is continuously updated, which is different from the calculation scene of Window.

The data output in the previous Window does not change after the Window ends, whereas in Group Aggregation, the results of the same Group Key are continuously updated.

(6) Window Aggregation Vs Group Aggregation

A more comprehensive comparison of some of the differences between Window and Group Aggregation.

Window Aggregation is on time in output mode, and it does not output until the defined data has expired. For example, if a window is defined for 5 minutes, the output is delayed. For example, during the period from 00:00 to 00:05, the output is complete only once after all the data in the window is complete.

Group Aggregation is a data trigger. For example, if the first data is generated, the result will be generated. If the second data is generated for the same Key, the result will be updated. Window Aggregation typically outputs an Append Stream, while Group Aggregation outputs an Update Stream.

There is also a big difference in the processing of State and State. Window Aggregation automatically cleans up the expired data, so users don’t have to worry about State bloat anymore. Group Aggregation is based on the infinite State to accumulate, so users need to define the TTL of the State according to their own computing scenarios, that is, how long the State is stored.

For example, the accumulated PV and UV in one day should be counted. Regardless of data delay, the TTL of State should be greater than or equal to one day at least, so as to ensure the accuracy of calculation. If the TTL for State is defined as half a day, the statistics may be inaccurate.

The storage requirements for the output are also determined by the nature of the output stream. On the output of the Window, because it is an Append stream, all types are interchangeable with the output. The Group Aggregatio outputs the update flow, so the target storage is required to support updates. You can use Hologres, MySQL, or HBase to support updates.

Real-time calculation Flink version SQL hands-on example

Let’s take a look at how each SQL operation would be used in a real business scenario, including basic SQL syntactic operations, including the use of common Aggregation.

(I) Example scenario description: E-commerce transaction data-real-time number warehouse scenario

The example here is the e-commerce transaction data scenario, which simulates hierarchical data processing in real-time data warehouse.

In the data access layer, we simulated the transaction order data of e-commerce, including the order ID, commodity ID, user ID, transaction amount, commodity leaf category, transaction time and other basic information, which is a simplified table.

Example 1 will be from a detailed data access layer to layer, a data cleaning work, moreover also do link category information, then we will demonstrate how data aggregation layer levels complete minutes transaction statistics, hours caliber do real-time transaction statistics, finally introduces the clinch a deal the scene on the accumulation level, how to do it quasi real-time statistics.

– Sample environment: private beta

The demo environment is the current in-beta version of the real-time computing Flink product, from which you can directly do one-stop job development, including debugging, as well as online operations.

– Access layer data

Generate simulated e-commerce transaction data using SQL DataGen Connector.

Access layer data: Simplified links for demonstration purposes, with built-in SQL DataGen Connector to simulate e-commerce data generation.

Order_id is designed as an increment sequence, and the parameters of Connector are not fully posted. DataGen Connector supports several generation modes. For example, Sequence can be used to generate self-increasing sequences, and Random mode can simulate Random values. Different generation strategies are selected according to the business meanings of different fields.

For example, order_id is self-incremented, commodity ID is randomly selected 110 thousand, user ID is 11 million, transaction amount is divided into units, cate_id is the ID of leaf category, here 100 leaf categories are simulated, and the product ID is directly mod by calculating column. The order creation time is simulated using the current time. This allows debugging on the development platform without having to create Kafka or DataHub to simulate the access layer.

(2) Example 1-1 Data cleaning

– E-commerce transaction data – order filtering

This is a data cleaning scenario. For example, to complete order filtering, the service side may filter the maximum or minimum exception value of the transaction amount. For example, the transaction amount is greater than 1 yuan and less than 10,000 yuan.

Transactions are created after a certain point in time, and this logic can be accomplished by filtering the WHERE combination of conditions.

Real business scenarios can be much more complex, so let’s look at how SQL works.

This is the debug mode. Click the Run button on the platform for local debugging. You can see that the amount column is filtered and the order creation time is greater than the required time value.

As you can see from this simple cleaning scenario, there is not much difference in writing and output between real-time and traditional batch processing. The main difference in streaming jobs is that they are kept running for a long period of time after they start running, as opposed to traditional batch processing, which ends after the data is processed.

(3) Example 1-2 Category information association

Now let’s see how to do dimension table association.

According to the data access layer order just now, because the raw data is inside the leaves of category information, dimension table need association class in our business purpose, dimension table records inside the leaves to the class object of class relationships, ID and Name, cleaning processes need to complete the goal is to use the original table inside leaf category ID to correlation dimension table, lacking purpose class ID and Name. Here, the writing method of INNER JOIN dimension table is adopted to select the corresponding fields of dimension table after association.

The only difference is the special syntax FOR SYSTEM_TIME AS OF dimension tables.

As shown above, the platform can upload its own data for debugging. For example, a CSV test data is used here to map 100 leaf categories to 10 first-level categories.

The units digit of the corresponding leaf category ID is the ID of its first-order category, which will be associated with the information of the corresponding first-order category and return its name. Local debugging is faster and results can be seen in real time. In local debugging mode, the terminal automatically pauses after receiving 1000 pieces of data to prevent excessive results from affecting use.

(4) Example 2-1 minute transaction statistics

Next, let’s look at window-based statistics.

The first scenario is minute-level transaction statistics, which is more commonly used at the summary level.

Minute statistics are easy to think of as Tumble Window, where each minute is individually calculated using several metrics, including total orders, total dollars, number of items sold, number of users sold, and so on. The number of goods traded and the number of users need to be duplicated, so a Distinct processing is made in the writing method. A Window is the Tumble Window you just introduced, a one-minute Window divided by the time an order was created and then tallied per-minute transactions in a one-level category dimension.

– Operating mode

The above picture is a little different from the debug mode. After launching, it actually commits to the cluster to run a job. Its output adopts the debug output, and directly prints to the Log. Expand the job topology, and you can see that the two-stage optimization of local-global is automatically enabled.

– Run log – View the debugging results

After running the Task for some time, you can see the final output from the Task log.

I’m going to use Print Sink, and it’s going to go straight into the Log. For real-world output, such as writing to Hologres/MySQL, it is necessary to look at the corresponding stored database.

As you can see, the output data has a lag with respect to the original time of the data.

At 19:46:05, the data of the window at 19:45:00 was output, and the aggregation result of the previous 1 minute was output about 5 seconds later.

This 5 seconds is actually related to the setting of the source table’s WATERMARK, which was declared with an offset of 5 seconds relative to the GMT_CREATE field. The effect of this is that, when the earliest data arrived at 19:46:00, we think the water line is at 19:45:55, which is a delay effect of 5 seconds, to achieve tolerant processing of out-of-order data.

(v) Example 2-2 hour-level real-time transaction statistics

The second example is to do hourly real-time transaction statistics.

As shown above, when asking for real-time statistics, would it be possible to make Tumble Window a 1-hour Size Tumble Window in real time? Based on the output shown just now, there is some delay effect. Therefore, if the window is opened for one hour, the results of the previous hour can only be output at the beginning of the next hour after all the data of this hour is received. The delay at the hour level cannot meet the requirements of real-time performance. Reviewing the Group Aggregation introduced before, it can meet the real-time requirements.

Flink supports GROUPING SETS, which is often used in traditional batch processing. Flink supports GROUPING SETS and GROUPING SETS. Flink supports GROUPING SETS and GROUPING SETS, which are used in conventional batch processing.

GROUPING SETS can be grouped BY GROUPING SETS. The first is GROUPING SETS, and the second is GROUPING SETS. The second is GROUPING SETS, and the order number is calculated, including the total amount, the number of products and the number of users.

This writing method adds a null value conversion process to the result to facilitate the viewing of the data, that is, for the statistics of the full caliber of hour, the output first-level category is empty, so it needs to do a null value conversion process.

Above is the debug mode running process, where you can see the data generated by Datagen updated in real time to the level 1 category and its corresponding hour.

Here we can see that the results of two different groups BY are output together, and a column of ALL in the middle is converted BY null value, which is the statistical value of full caliber. Local debugging is relatively intuitive and convenient. If you are interested, you can also apply for or purchase experience on the official website of Ali Cloud.

(vi) Example 2-3 day cumulative transaction quasi-real-time statistics

The third example is day-level cumulative transaction statistics, where the business requirements are quasi-real-time, such as being able to accept minute-level update delays.

According to the real-time statistics of Group Aggregation hour just now, it is easy to imagine that this requirement can be realized by directly changing Query into day dimension. Moreover, the real-time performance is relatively high, and the second-level update can be achieved after the data is triggered.

To review the differences in built-in State handling between Window Aggregation and Group Aggregation, Window Aggregation can automatically clean the State, while Group Aggregation requires users to adjust the TTL by themselves. For example, Cumulate Window is used to do cumulative Window calculation. The day level accumulation and the minute level step are used to achieve the quasi-real time requirement of update per minute.

Review the Cumulate Window, as shown above. For day level accumulation, the maximum Size of Window is day, and its Window Step is one minute, which can express the cumulative statistics of day level.

We use the new TVF syntax to include the Windows definition in the middle with a TABLE keyword. Then Cumulate Window references the input TABLE and defines its time attributes, step size and size parameters. GROUP BY is normal, because it has pre-output, so we print the start time and end time of the window together.

This example also looks at the Log output by running it online.

– Operating mode

As you can see, it operates in a similar structure to previous Tumble Windows, with pre-aggregation plus global aggregation, but differs from Tumble Window in that it doesn’t have to wait until all data is available to output results.

– Run logs – View the debugging result

As can be seen from the above example, at 20:47:00, results from 00:00:00 to 20:47:00 have been accumulated, as well as the corresponding 4 column statistics. The next output is the accumulative window. It can be seen that 20:47:00 to 20:48:00 is an accumulative step, which not only meets the accumulative statistical demand of day level, but also meets the requirement of quasi-real-time.

(7) Example summary: E-commerce transaction data – real-time data warehouse scenario

Then let’s summarize the above examples as a whole.

The characteristics of cleaning from the access layer to the detail layer are relatively simple and clear. For example, the business logic needs to do fixed filtering conditions, including the expansion of dimensions, which are very clear and direct.

From detail level to summary level, in the example of minute level statistics, we use Tumble Window, while hour level statistics is changed to Group Aggregation due to real-time requirements. Group Aggregation and the newly introduced Cumulate Window are displayed on the sky level.

From the computing characteristics of the summary layer, we need to pay attention to the real-time requirements and data accuracy requirements of the business, and then choose Window aggregation or Group aggregation according to the actual situation.

Why is accuracy mentioned here?

When comparing Window Aggregation and Group Aggregation at the beginning, it was mentioned that Group Aggregation has very good real-time performance, but its data accuracy depends on TTL of State. When the statistical period is longer than TTL, Then the TTL data may be distorted.

On Window Aggregation, on the other hand, there is a limit to tolerance for disorderly ordering, such as accepting a wait of one minute at most, but in actual business data, perhaps 99% of data can meet such requirements, and 1% of data may take an hour to arrive. Based on WATERMARK processing, it is a discard strategy by default. Data exceeding the maximum offset will be discarded and not included in the statistics, so the data will lose its accuracy. Therefore, it is a relative indicator, which needs to be selected according to specific business scenarios.

Develop frequently asked questions and solutions

(1) Common problems in development

Above is the real-time calculation of high-frequency problems in the process of real service contact.

First of all, I don’t know how to start real-time computing, how to start real-time computing, for example, some students have a background in batch processing, and then just start to learn Flink SQL, don’t know where to start.

Another type of problem is when the SQL is written and you know what level of data the input processing is going to be, but you don’t know how much resources you need to set up once the real-time job is running

There is another kind of SQL writing is more complex, this time to do debugging, such as to check why the calculated data does not meet the expectations and other similar problems, many students do not know how to start.

How to tune a job after it runs is also a very common problem.

(2) Develop solutions to common problems

1. How to start real-time computing?

There are a lot of official documentation in the community, as well as some examples. You can start with simple examples, and gradually understand the different operators in SQL, and what kind of features will be available when streaming computing.

In addition, you can also pay attention to the developer community real-time calculation of Flink version, ververica.cn website, Apache Flink official account of site B and other shared content.

After getting familiar with SQL, if you want to apply it to the production environment to solve real business problems, Ali Cloud industry solutions also provide some typical architecture design, can be used as a reference.

2. How to debug complex operations?

If there are any level one thousand lines of complex SQL, even for the development of the Flink classmate to locate the problem can’t be clear at a glance, but still need to follow by Jane to numerous process, may need to use some debugging tools, such as the demonstration in front of the platform debugging, and then do section of validation, the small piece of SQL local result is correct after debugging, And then, step by step, to get this complex operation right.

In addition, you can take advantage of the syntactic features of SQL to organize it a little more cleanly. Real-time calculation Flink products have a code structure feature that makes it easier to locate specific statements in long SQL, which are some of the AIDS.

3. How to tune the initial resource setting of a job?

A good rule of thumb is to do small concurrent tests initially based on incoming data, see how it performs, and then estimate. In large concurrent pressure tests, it is straightforward but reliable to approach the desired performance configuration step by step in terms of required throughput.

Tuning is mainly based on the operation of the job. We will pay attention to some key indicators, such as whether there is data skew, the Lookup Join of the dimension table needs to access external storage, and whether there is IO bottleneck, which are common bottlenecks affecting job performance and need to be paid attention to.

The real-time computing Flink product is built into a feature called AutoPilot, which is understood to be similar to AutoPilot, where the initial resource setting is not a problem.

On the product, after setting the maximum resource limit for the job, the engine can automatically help us adjust to the optimal state according to the actual data processing capacity, and scale according to the load situation.

The original link

This article is the original content of Aliyun and shall not be reproduced without permission.