Big data technology AI

Flink/Spark/Hadoop/ data warehouse, data analysis, interview, source code interpretation and other dry goods learning materials

94 original content

The public,

Use the optimal algorithm

When TopN’s inputs are non-update flows (such as Source), TopN has only one algorithm appendix.

When the TopN input is an update stream (for example, after AGG/JOIN calculation), there are two TopN algorithms, in descending order of performance: UpdateFastRank and RetractRank. The algorithm name is displayed on the node name in the topology.

Note: Apache Community Flink1.12 does not currently have UnaryUpdateRank, aliccloud Real-time Computing Flink does

  • UpdateFastRank: The optimal algorithm must meet two conditions:

  • The input stream has Primary Key (PK) information, such as ORDER BY AVG.

  • The sort field updates are monotonic and monotonic in the opposite direction of the sort. For example, ORDER BY COUNT/COUNT_DISTINCT/SUM (positive number) DESC. If you want to get the optimization Plan, you need to add the filter condition that SUM is positive when using ORDER BY SUM DESC.

  • AppendFast: Result is appended, not updated

  • RetractRank: a common algorithm with poor performance. You are not advised to use this algorithm in the production environment. Check whether PK information exists in the input stream. If so, perform UpdateFastRank optimization.

No ranking optimization (to solve data bloat)

  • TopN grammar
SELECT [column_list] FROM ( SELECT [column_list], ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]] ORDER BY col1 [asc|desc][, col2 [asc|desc]...] ) AS rownum FROM table_name) WHERE rownum <= N [AND conditions]Copy the code
  • Data inflation problem

Problem description: According to the syntax of TopN, the Rownum field is written to the result table as a field with a unique key, which may result in a large number of records being written to the result table. For example, when a ranking 9 record (such as product-1001) is updated and its ranking is upgraded to 1, all records ranked 1-9 are output as update messages to the results table. If the result table receives too much data, it becomes a bottleneck for SQL jobs

Optimization: Omit the Rownum field in the outer SELECT clause of a top-N query. This makes sense because the number of first N records is usually small, so consumers can quickly sort the records themselves. There is no Rownum field, and in the above example, simply sending the changed record (product-1001) downstream can save a lot of IO to the result table

  • use
CREATE TABLE ShopSales ( product_id STRING, category STRING, product_name STRING, sales BIGINT) WITH (...) ; -- omit row_num field from the outputSELECT product_id, category, product_name, salesFROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) AS row_num FROM ShopSales)WHERE row_num <= 5Copy the code

In order to output the above query to the external store and get the correct result, the external store must have the same unique key as the top-N query. In the example query above, if product_id is the unique key of the query, then the external table should also have product_id as the unique key

In a rownum free scenario, you need to be careful with the definition of the primary key of the result table. If the definition is wrong, it will directly lead to the incorrect TopN result. In no Rownum scenario, the primary KEY should be the KEY list of the GROUP BY node upstream of the TopN.

Example Increase the size of TopN Cache

TopN A State Cache layer is used to improve performance. The Cache layer improves State access efficiency. The formula for calculating TopN Cache hit ratio is

cache_hit = cache_sizeparallelism/top_n/partition_key_num

For example, if the Top100 Cache is configured with 10000 entries and 50 concurrent entries, the Cache hit ratio is only 1000050/100/100000=5% when the key dimension of the PatitionBy is large, for example, 100000. The hit ratio is very low, resulting in a large number of requests hitting State. Performance degrades dramatically. Therefore, when the PartitionKey dimension is particularly large, the CacheS ize of TopN can be appropriately increased, and the Heap Memory of TopN node is also appropriately increased.

  • use
// Initialize table environment TableEnvironment tEnv =... Val Configuration = tenv.getConfig ().getConfiguration(); // Set TopN cahCE to 200000 Configuration. setString("table.exec.topn.cache-size", "200000");Copy the code

Note: this parameter is not listed in the official website and is marked as an experimental item in the source code

The PartitionBy field must have a timelike field

For example, enter the Day field in the daily ranking. Otherwise, the result of TopN will be distorted due to State TTL in the end.

An example of optimized SQL

SELECT student_id, subject_ID, stat_date, score FROM (SELECT *, ROW_NUMBER() OVER (PARTITION BY student_id, stat_date) ORDER BY score DESC ORDER BY score DESC AS rownum FROM (SELECT student_id, subject_ID, stat_date, -- key) We declare that the parameters of Sum are all positive, so the result of Sum is monotonically increasing, so TopN can use optimization algorithm, Sum (score) filter (WHERE score >= 0) AS score FROM score_test WHERE score >= 0 GROUP BY student_id subject_id, stat_date ) a WHERE rownum <= 100 );Copy the code

Explanation:

  1. Reduce the output of the result table by not printing rownum fields (no ranking optimization)

  2. Note that there is a time field, otherwise state expiration will result in data corruption (partition field optimization)

  3. Sort by upstream sum. Sort field updates are monotonic, and monotonic direction is opposite to the sorting direction (go to optimal algorithm)

  4. The result of Sum is monotonically increasing. Therefore, TopN can use the optimization algorithm to obtain only the first 100 data (using the optimal algorithm).

This article uses the article synchronization assistant to synchronize