Abstract: This article is compiled from the Flink Forward Asia 2021 shared by Zhang Jing and Zhang Mang, technical experts of Kuaishou real-time computing team. The main contents include:
- Flink SQL in Quick hand
- Function extension
- Performance optimization
- Stability enhancement
- future
FFA 2021 Live Playback & Presentation PDF download
Flink SQL in Quick hand
After more than a year of promotion, Kuaishou internal users have gradually improved their recognition of Flink SQL. Among the new Flink jobs this year, SQL jobs have reached 60%, which has doubled compared with last year, and the peak throughput has reached 600 million pieces/second.
Second, function expansion
In order to support internal business requirements, Kuaishou has made a lot of function extensions. This paper focuses on sharing two window-based extensions, one is Group Window Aggregate extension. One is the Window table-Function extension proposed in Flip-145.
Explain the differences and connections between the two:
-
Group Window Aggregate is used for Window aggregation in Flink 1.12 and earlier versions. It has two limitations. The first one is that its syntax does not conform to SQL standards, and it requires special Window functions and Window auxiliary functions to complete job aggregation. It also limits window functions to group by clauses, so they can only be used for aggregations.
-
Therefore, Flink proposed Window TVF in Flip-145, which is based on the syntax of polymorphic table function proposed in SQL standard 2017. In addition, it can do Window aggregation, Window association, TopN and de-weight operations.
2.1 Group Window Aggregate Expansion
If you already have Window TVF, why extend Group Window Aggregate? Since Kuaishou only started to upgrade from version 1.10 to 1.13 in the second half of this year, most of its business is still using version 1.10.
Kuaishou has made two extensions on Group Window Aggregate, one is to support multi-dimensional aggregation, the other is to introduce higher-order Window functions.
2.1.1 Support multidimensional analysis
Flink SQL has long supported multi-dimensional aggregation over infinite streams. The Standard Grouping Sets, Rollup, and CUBE clauses are used to support multi-dimensional analysis over Group Window Aggregate. Such as scrolling, sliding, session Windows, etc.
For example, the Grouping Sets clause in SQL represents the cumulative UV values of the subject and total dimensions. The Group BY clause of SQL represents the CUMULATE window function and the Grouping Sets clause. There are two elements in parentheses: one for the total dimension and one for the topic dimension.
2.1.2 Introducing higher-order window functions
Data analysis developers often encounter the need to draw a curve, each point is the meaning of the day from 0 to the current point of time, the abscissa represents the time, the ordinate represents the cumulative indicators. There are two solutions to this requirement:
-
The first solution is to use infinite flow aggregation, which is used as a column of group key after the time is normalized to one minute granularity. However, business requires that the curve output to the screen does not change, while the output result of infinite flow aggregation is an update flow, so it does not meet the requirements.
-
The second solution is to use the scroll window function of the day. In order to output the results ahead of time, you still need to set the trigger ahead of time. The time point is the current machine time or the largest timestamp in the historical input data. The disadvantage of this scheme is that the ordinate of each point is not the cumulative value at that point in time. This can lead to several anomalies, such as job failures and restarts, or when the business proactively goes back in time, the history curve cannot be fully restored. And the cumulative values of the sub-dimensions at each point do not add up to the total dimension. Another disadvantage is that we often use two-stage aggregation to avoid the distinct key skew when doing UV statistics, but using this scheme can cause pits in the curve itself.
Here are some of the abnormal curves caused by plan 2:
-
The first curve is a historical retrospective. When LAG is eliminated, the curve becomes normal. When lag is not completely eliminated, the curve is not smooth and the historical curve cannot be restored.
-
The second curve is a pit in the increment curve.
Because the first level aggregate output stream is an update to flow, Flink withdraw and update the update mechanism is sending two separate message, rather than an atom, so the second aggregation may receive first issued on the upstream multiple concurrent withdrawal news, this is to cause a decline in total value to rise again, so formed the pits.
We introduced the CUMULATE window to address these issues.
CUMULATE: Group Window Aggregate this is the same as the CUMULATE Window in Flip-145, but syntactically it applies to Group Window Aggregate. It takes three mandatory parameters: the time property column, the window step and Max size, and an optional parameter that specifies the offset at which the window starts.
For the partition logic of CUMULATE window, suppose CUMULATE window step is 1 minute, Max size is 3 minutes, window1 interval is 0 1 points, Window2 is 0 2 points, window3 is 0 3 points, Window4 starts with 3 or 4 minutes, Window5 with 3 or 5 minutes, and so on, and a piece of data with a time stamp of 0 minutes and 30 seconds is divided into Window1, Window2, and Window3 Windows.
For example, need to draw a data curve, a minute to play a point, each point represents the day of each sub-page cumulative UV. The CUMULATE window function has a step size of one minute, Max size of one day, the business group key is the ID of the sub-page, and the timestamp is the end time of the window.
As you can see from the chart above, the CUMULATE curve is smooth for both normal consumption and backtracking.
The merits of the CUMULATE window
-
The end time of the first advantage is that use window as abscissa of each point, each point on the curve of the ordinate is the cumulative value of the corresponding point on the abscissa, so whatever happens in back history or homework failover, curve can restore completely, and the dimension values of each point in time points together is always equal to the total dimensions of value.
-
The second advantage is the use of two-stage aggregation to prevent distinct key skewing. Since the data is sent at the end of the window, there is no retraction and the output is an Append stream, so there are no potholes in the increment curve.
Dynamic cumulate window
Dynamic cumulate window is also designed to solve the needs of curves, such as calculating the cumulative index of broadcast rooms since the start of broadcasting. Different from the previous requirement, it is uncertain how long each broadcast station can last, but it is also a kind of cumulative index. It takes two mandatory parameters: the time property column and the step size of the window, and an optional parameter, the gap of the window, which defines how long the window is considered to be over without input data. Note here that the end of a window will trigger the result output of the window, and the state will be cleared. If another data with the same key comes, the late data will be discarded, and the data that is not late will be divided into a new window, and the cumulative value will start from zero.
Such as the case above, need to draw a curve, each point represents each broadcast room since the start of the cumulative UV, if a broadcast room for a continuous hour without data inflow, it is considered closed broadcast. The window function uses Dynamic Cumulate Window with step size of 1 minute, gap of 60 minutes, Group key of live broadcast ID, and timestamp of end time of window.
2.2 Window table-Function extension
2.2.1 Enrich Window TVF operators
The community proposed Window table-valued Function (Window TVF) syntax in flip-145, and realized Window aggregation. On this basis, we have enriched the window operator, including TopN, association, and deduplication, and also supported a separate query statement for Window Table-function. These functions have been introduced in various versions of the community. With these window operators, users can implement more complex business logic with Flink SQL.
As shown in the figure above, it is necessary to make statistics of the sales and buyers of the top 100 hottest commodities on that day, as well as the sales of the anchors of these popular commodities. First, a window aggregation is made to obtain the sales volume and the number of buyers of each product since 0 o ‘clock. Then, another window aggregation is made to obtain the number of buyers of all the treasures of each anchor. The two results are linked by Windows.
2.2.2 Support Window Offset
The window offset is used to adjust the window division logic. It is an optional parameter. The default value is 0, indicating the zero point of Unix time at which the window division starts. But it only affects how Windows are divided, not watermark. In addition, the same window and different offsets may have the same offset effect. For example, for a 10-minute scroll window, the starting point is shifted to the left for 4 minutes or to the right for 6 minutes, which has the same effect on window division.
In the example above, you need to plot a data curve with one dot per minute representing the cumulative UV of each page over the course of the week. You can use the CUMULATE window function with an event time of 1 minute step size and a Max size of 7 days. Because Unix time zero is on A Thursday, if the default offset is used, the window partition is from Thursday this week to Thursday next week, so offset is set to 4 days, which means the right offset is 4 days, so it is from Monday to Monday next Monday.
2.2.3 Batch Mode is supported
We also added support for batch mode. The principle is to introduce a Windows operator, attach the window attribute to the input data and send it to the downstream, and the downstream operator reuses the existing operator on the batch, such as HashAgg or SortAgg for aggregation, HashJoin or SortMergeJoin for association. These batch operators do not require state compared to operators in stream mode, so they also perform better on throughput.
Third, performance optimization
This paper mainly introduces two aspects of optimization, one is the state optimization on aggregation, the other is the save batch optimization on dimension table association.
3.1 State optimization on aggregation
Here is an example to understand the state reuse of distinct States in an aggregation scenario. The uVs for each subchannel of the application need to be counted. This use case has two characteristics: the channels are enumerable and the visitors to each channel have a high degree of overlap.
The most primitive query statement is shown in the figure above, where the group key is a channel and a count distinct is used to count the UVs of each channel. Suppose there are only three enumerations of channels, A, B, and other. Group key is the channel ID, key device ID of map state, and value is A 64-bit long value. Each bit indicates whether the device is present in the channel. In simple scenarios, this Value is 1.
In the figure above, there are two devices with ids 1 and 2 under channel A. Device 1 accesses channel B and device 2 accesses Other channel at the same time. As you can see, maps of different channels can have a lot of overlap. To reuse these keys, you can manually rewrite the SQL using a community-provided method.
First we do a row to column operation that prints the three channel values to the filter criteria of the count DISTINCT aggregate function, and then we use a custom table function to do the column transformation before output.
The rewritten query statement, device set status, and storage are shown in the figure above. The Group key is empty, the map state key is the device ID, and the map State value is a 64-bit long. Each bit indicates whether the device appears in each channel. For example, if the value of device 1 is 110, the device accesses channel A and channel B, and device 3 accesses channel B and Other.
This scheme greatly reduces state, but there are two drawbacks. The first is the need to manually rewrite THE SQL, which can be very long if a dimension has multiple values or has multiple enumerable dimensions, and the other disadvantage is the need to use custom table functions for column to row conversions.
We propose a simplified SQL representation that achieves both state benefits and reduces the burden on data developers. The user only needs to tell the optimizer the enumeration value of the group key in a way in the query statement, and the optimizer will automatically rewrite the enumeration value to perform column and row transformation. After rewriting, the DISTINCT Map State can be reused. After rewriting the equivalent query statement, only need to specify enumeration value in the filter condition, in or or expression can be used.
The performance optimizations described above can be used for infinite stream aggregation and window aggregation, and one or more enumerable dimensions are available for simple aggregate queries as well as multi-dimensional aggregation.
The restriction is that at least one key in the group key is enumerable, and the enumeration value must be static and can be explicitly written to the filter condition. In addition, the distinct keys in each dimension must overlap to achieve the effect of saving state. If the UV statistics of each province are needed, it can basically be considered that there is no intersection between visitors from different provinces. In this case, multiplexing distinct keys is not beneficial. In addition, when window aggregation, window functions must have line semantics, not collection semantics. For line-semantic Windows, which window the data currently belongs to depends on the data itself; But for a window with collection semantics, which window the current piece of data belongs to depends not only on the data itself, but also on the historical data set that the window has received. This optimization adjusts the group key of the aggregation operator and affects the data set received by each window, so it is not applicable to Windows with set semantics.
Finally, some users may ask why Calcite is not syntactically used to provide pivot/unpivot to explicitly express row to column and column to row. First, the conditions are not there, because Calcite only introduced Pivot in version 1.26, unpivot in version 1.27, and Flink has relied on Calcite 1.26 since version 1.12. The second reason is that with pivot/unpivot syntax, SQL is much longer than it is today.
3.2 Save batch optimization of dimension table association
Batch optimization of dimension table association is to reduce the number of RPC calls. The principle is to call the Batch query interface of the dimension table after saving a Batch of data. Grammatically, we introduce the general mini-batch hint, which has two parameters: one is how long to save a Batch of data, and the other is how many data to save a Batch. A valid Mini-batch hint contains at least one parameter. Hint is designed so that it can be used not only for dimension table correlation, but also for batch optimization of aggregates.
For another example, you need to widen the order table and associate the customer information for the order. The query statement followed by a hint at the Customers dimension table indicates a batch of 5 seconds or a batch of 10,000 data, an optimization that is far more complex in the implementation of the underlying operator and design than the expression of SQL syntax.
4. Stability improvement
In terms of stability, this paper mainly introduces the optimization and improvement of Group Window Aggregate to solve data skew and Flink SQL Aggregate to adjust the state compatibility.
4.1 Data skew of Group Window Aggregate
Window calculation is widely used in Kuaishou. It is easy to encounter data skew in kuaishou’s business scenarios, such as the live broadcast of major anchors and some major activities. Real-time computing with data skew can result in metric delays at best or data crashes at worst, so we support mini-Batch, local-Global, Split Distinct optimizations on Tumble Window, as well as similar optimizations on other commonly used Windows. When these optimizations go live, they can not only help businesses avoid data skew, but also provide good performance gains.
4.2 Aggregate State Compatibility
First, let’s look at the Aggregate State-compatible business scenarios on Flink SQL. As services develop, daily tasks may add indicators or delete unnecessary indicators. In the process of major activities, the business shall add day-level accumulated indicators or indicators continuously accumulated in the activity cycle.
If it is a day metric change, the developer can only discard the status, upgrade the task after 0, and then specify that the task starts consuming data from 0 to keep the metric continuous. If the indicator is continuously accumulated by activities, you can only select a new task to separately calculate the new indicator to avoid the impact on the original indicator. However, this will lead to resource redundancy.
The reason why such a complex operation is required is that Flink SQL has a simple strategy to determine whether the state is compatible or not. It only depends on whether the data type of the state required by the engine is exactly the same as that saved in Savepoint. There is a loophole in this judgment method. In this case, Flink will consider the State to be compatible if the type of State is the same but the aggregate function in SQL is changed.
Based on this background, we propose the compatibility of Aggregate State. The goal is that the cost of learning to use the state compatibility scheme is very low (or zero cost). Users can upgrade tasks at any time without the need for zero-point operation, and support the operation of adding and deleting aggregate functions.
In scenarios compatible with aggregate State, aggregation functions can only be added at the end of the aggregation function, and aggregation functions at any position can be deleted. The sequence of aggregation functions cannot be changed, and both operations can be added and deleted at the same time in one upgrade.
The right table in the figure above shows the mapping between indicator identifiers and state types. To make it easier to judge whether state is compatible, we save the mapping between indicator identifier and state type into the meta of State, because some aggregation functions may have more than one state, such as AVG function, It needs the sum and count states to help it calculate, so this mapping is important.
When adding an aggregate function, it is necessary to fill in the initial value of the newly added state. Different functions have different initial values. For example, the initial value of count is 0, but the initial value of sum must be null.
Windows’ early-fire and late-fire scenarios introduce Retract messages, adding a state to record data sent downstream. It has more time fields than the original state of the window, which needs to be handled during judgment and state migration.
As mentioned earlier, we saved the mapping between the indicator identifier and the state type in the meta information of State, which caused the state forward compatibility problem. The new version of the application could not correctly read the previous version of Savepoint. To solve this problem, you need to modify the version information of the meta and use the version information to distinguish the state of the old version from the new version to achieve forward compatibility of state.
In the aggregate scenario, users may set the state TTL to control the clearing of invalid states. The TTL timestamp must be consistent with the original data and cannot be changed after the migration.
The advantage of the Aggregate State compatible scheme is that the cost for users to learn and use is very low, and there is almost no perception and no dependence on any external service architecture. The disadvantage is that the source code of Flink is invaded, which increases the cost of upgrading Flink version in the future. In addition, only aggregation computing scenarios can be supported at present.
Finally, I will introduce the ultimate state compatibility solution that Kuaishou is working on. Users can add, remove, or even modify state at any point in Savepoint. You can also customize a full list of states, such as state initialization for the Flink on Hudi task.
The advantage of the ultimate solution is that it does not invade Flink source code and facilitates Flink version upgrade. Users can operate in the platform interface without developing codes and support state compatibility for all scenarios, which is no longer limited to specific scenarios. The disadvantage is that it is costly for users to learn, and they need to know more professional knowledge points such as Operator State and keyedState, as well as whether Operator contains State.
5. Future prospects
In the future, Kuaishou will continue to expand functions in the direction of Stream SQL, improve performance, achieve the purpose of reducing cost and increasing efficiency, and explore state compatibility in more scenarios. In terms of Batch optimization, Kuaishou will improve the ability of Flink Batch SQL, increase the optimization of speculated execution and adaptive query, improve the stability and performance of Batch SQL, and continue to expand the business application scenarios; In terms of data lake and real-time data warehouse, we will continue to promote their implementation in more business scenarios.
FFA 2021 Live Playback & Presentation PDF download