This article is published by netease Cloud


Previous articles have briefly introduced the background of Join in the field of big data and several commonly used algorithms, such as Broadcast Hash Join, Shuffle Hash Join and sort Merge Join, as well as the core application scenarios of each algorithm. Broadcast Hash Join is used when large tables and small tables are joined. Shuffle Hash Join is used when small tables are too large for broadcast distribution. Finally, sort Merge Join is used when two large tables are joined.


The SQL execution engine must know the size of the two tables to select the optimal algorithm. So how do you know the size of two tables? Is the size of the two tables measured by physical size or the number of records or both? In fact, this is another knowledge — Cost Based Optimization (CBO). It can not only explain the selection problem of Join algorithm, but more importantly, it can also determine the Join order problem in multi-table joint Join scenario.


Are you looking forward to CBO? Okay, let’s dig a hole here, and we’ll talk about the next topic. So what are we talking about today? Join algorithm selection and Join order selection do have a great impact on Join performance. However, there is another very important factor that is crucial to Join performance, that is Join algorithm optimization! Broadcast Hash Join, Shuffle Hash Join, or sort Merge Join are all basic join algorithms. Is there any optimization solution? Indeed, this is the subject of today’s talk – Runtime Filter (RF).


RF preparation knowledge: Bloom filter

RF explicitly uses BloomFilter to filter the tables participating in the join and reduce the amount of data actually participating in the join. In order to explain the process in detail below, it is necessary to first explain the data structure BloomFilter (those familiar with it can take a detour). Bloom Filter uses bit array to realize filtering. In the initial state, each bit of the bit array is 0, as shown in the figure below:

Suppose there is a set S = {x1,x2… ,xn}, Bloom Filter uses k independent hash functions to map each element in the set to {1,… The range of m}. For any element, the number mapped to acts as the index of the corresponding bit array, and that bit is set to 1. For example, if the element x1 is hash mapped to the number 8, the eighth bit of the array will be set to 1. In the following figure, set S has only two elements x and y, which are mapped by three hash functions to positions (0,3,6) and (4,7,10) respectively, and the corresponding bit will be set to 1:

Now if you want to determine whether another element is in the set, you just need to map the three hash functions to see if there is a 0 in the corresponding position. If there is, the table shows that the element definitely does not exist in the set, otherwise it might exist. Z is definitely not in the set {x,y} as shown below:

RF algorithm theory

To better illustrate the process, here is a complete explanation of the RF algorithm using an SQL example: Select item.name,order.* from order,item where order.item_id = item.id and item.category = ‘book’; The two tables are joined according to the item ID field, which means to fetch all order details for the item category book. Assuming that there are not many goods whose type is book, the join algorithm is determined as broadcast Hash Join. The whole process is shown in the figure below:

Step 1: The join field (item.id) of item table is mapped into a BloomFilter by multiple hash functions (If you do not know bloomFilter, Google it by yourself).


Step 2: Broadcast the mapped BloomFilter to all partitions of the ORDER table for filtering.


Take Partition2 as an example. Storage processes (such as DataNode processes) read join column (order.item_id) data in order table one by one and use BloomFilter to filter the data. Eliminate the order data is not book related goods order, this data directly skipped; Otherwise, the order data may be the order to be retrieved, and all the data in the line will be scanned out;


Step 4: Send all order data not filtered by BloomFilter to impALAD through local socket communication;


Step 5: Then broadcast all the book commodity data to all Partition nodes and perform a real HashJoin operation with the order data obtained in Step 4 to obtain the final selection result.


RF algorithm analysis

An SQL example is used to briefly demonstrate the operation flow of the RF algorithm in broadcast Hash Join. According to the flow, the theoretical hierarchy analysis of the algorithm is as follows:


RF essence: The Join can be optimized in three ways by pushing down the predicate (BloomFilter) and filtering the data at the storage layer through bloomFilter. First, if you can skip many records, you can reduce the number of DATA I/O scans. This point needs to be explained, many friends will have such a question: since the data needs to be scanned out and filtered by BloomFilter, why will I/O scanning times be reduced? Focus on the fact that most table storage behavior is column storage, and columns are stored independently. Scan filtering only requires scanning join column data (not all columns). If a column is filtered out, other columns in the same row are not scanned, which reduces the NUMBER of I/O scans. Second, it reduces the overhead of sending data from the storage layer to the computing layer through sockets (or even TPC), and third, it reduces the overhead of the final Hash Join execution.


RF cost: Compared with Broadcast Hash Join that does not use RF, the former costs bloomFilter generation, Broadcast, and filtering of large tables based on BloomFilter. In general, these steps are not costly in the case of small tables and can be ignored.


RF optimization effect: It basically depends on the filtering effect of BloomFilter. If a large amount of data is filtered out, the performance of Join will be greatly improved. Otherwise, performance gains will be limited.


RF: Like common predicates that push down (‘ = ‘, ‘>’, ‘<‘, etc.), RF implementations need to implement the relevant logic at both the computing layer, which constructs and propagates bloomFilter to the storage layer, and the storage layer, which uses the BloomFilter to filter the specified data.


RF effect verification

In fact, the optimization effect of RF was found in the analysis of the impala on Parquet and Impala on Kudu benchmark comparison tests conducted by colleague He Dashen in the group. In practice, the Impala on Parquet had a significant performance advantage over the Impala on Kudu, with at least a 10-fold improvement visually. The same SQL parsing engine, different storage engine performance is dramatically different! To find out why, impala’s execution plan analysis tool was used to analyze the execution plans of the impala and the Execution plans of the Impala respectively. It was only by looking at the clues that RF was used in the impala and not in the Impala (there may be other factors, but RF is certainly one of them).


The benchmark test uses TPCDS test with data size of 1T. This article uses a typical SQL (Q40) in the test process as an example to play back the magic effect of RF. The following is the comparison of Q40 performance. It can be seen intuitively that RF can directly improve the performance of 40X by 40 times.

Let’s take a quick look at the Q40 SQL statement, as shown below. Catalog_sales join date_DIM, catalog_SALES join warehouse, and catalog_SALES join item

select

w_state, i_item_id,

sum(case when (cast(d_date as date) <

cast (‘1998-04-08’ as date))

Then cs_sales_price –

coalesce(cr_refunded_cash,0)

else 0 end) as sales_before,

sum(case when (cast(d_date as date) >=

cast (‘1998-04-08’ as date))

Then cs_sales_price –

coalesce(cr_refunded_cash,0)

else 0 end) as sales_after

from

catalog_sales left outer join catalog_returns

on

(catalog_sales.cs_order_number = catalog_returns.cr_order_number

and catalog_sales.cs_item_sk =

catalog_returns.cr_item_sk),

warehouse, item, date_dim where

I_current_price between 0.99 and 1.49

and item.i_item_sk = catalog_sales.cs_item_sk

and catalog_sales.cs_warehouse_sk =

warehouse.w_warehouse_sk

and catalog_sales.cs_sold_date_sk =

date_dim.d_date_sk

and date_dim.d_date between

‘1998-03-09’ and ‘1998-05-08’ group by w_state, i_item_id order by w_state, i_item_id limit 100;


Typical star structure, where catalog_SALES is the fact table and the other tables are latitude tables. The join at the latitude catalog_sales join item was selected for this analysis. Because impala was used for both SQL parsing engines in the comparison test, the SQL execution plan was basically the same. Catalog_sales join item: Shuffling Hash Join; shuffling Hash Join; shuffling Hash Join; shuffling Hash Join Slightly different from the broadcast Hash Join example above, but not affecting the conclusion) :



Through the analysis of the execution plans of the two scenarios, the basic theoretical results above can be basically verified:


1. Confirm that after RF, the amount of data in the large table is largely filtered out and only a small amount of data is left to participate in the final HashJoin. Refer to the second large table scan results. 70 million rows + records were returned without RF, while only 3W + records met the conditions after RF filtering. 30 million compared with 70 million, the performance optimization effect is self-evident.


2. After RF filtering, the network time of loading a small amount of data from the storage process to the computing process memory through the network is greatly reduced. See the third line, “Data loaded to process memory,” which takes 15s and only 11ms. The main time is divided into two parts, in which the time of data serialization takes about 2/3 to 10s, and the time of data transmission through RPC takes another 1/3 to 5s.


3. Finally, after RF filtering, the number of participants in the final Hash Join is greatly reduced. The Hash Join takes 19s in the former and 21ms in the latter, mainly due to the large Probe Time.


Predicate push down as agreed?


To be honest, when I first got in touch with RF, I thought it was a real artifact, and my admiration was overwhelming. However, after a period of exploration and digestion, until the completion of this article, that is, at this moment, I suddenly feel that it is not inscrutable. To put it simply, it is a predicate push down. The difference is that the predicate here is a little strange, it is just a Bloomfilter.


When we talk about predicates pushing down, let me extend this a little bit. Before often full street hear predicate push down, however to predicate push down always feel muddled, do not understand very true. After the baptism of RF, I am now sure to have a further understanding. Take it out here and communicate with you. I think there are two levels of understanding of predicate pushdown:


The first is logical execution plan optimization level, such as SQL statements: Select * from order,item where item.id = order.item_id and item.category = ‘book’; select * from order,item where item.id = order.item_id and item.category = ‘book’; Predicate push-down allows the Filter operation to be pushed down before the Join operation. Push down where item.category = ‘book’ before item.id = order.item_id


Predicate pushdown is a predicate that pushes filtering conditions down from the calculating process to the stored process. Note that there are two types of processes: the calculating process and the stored process. The separation of computing and storage is quite common in the field of big data. For example, the most common computing processes include SparkSQL, Hive, and Impala, which are responsible for SQL parsing and optimization and data calculation aggregation. Storage processes include HDFS (DataNode), Kudu, and HBase, which are responsible for data storage. It is normal to load all data from the storage process to the computing process, and then perform filtering calculations. Predicate pushdown means that you push some filter criteria to the storage process, and the storage process directly filters out the data. The benefits are obvious: the earlier you filter, the less data, the less serialization overhead, the less network overhead, the less computation overhead, and the better performance.


When I write here, I suddenly realize that the author has made a very serious cognitive error above: RF mechanism is not just a simple predicate push down, its essence lies in proposing an important predicate-Bloomfilter. There are not many systems that support RF, and I only know of Impala on Parquet that currently supports it. Impala on Kudu Although Impala supports Kudu, Kudu does not. SparkSQL on Parqeut has storage system support, but the computing engine -SparkSQL is not supported at present.


This paper mainly introduces an optimization method similar to semi-join, deeply discusses the optimization details, and discusses its own solution to predicate push-down technology combined with the analysis process. The following will bring you the cost based optimization (CBO) related issues, stay tuned!


Netease has

Enterprise big data visualization analysis platform. The self-service and agile analysis platform for business personnel adopts PPT mode to make reports, which is easier to learn and use. It has powerful exploration and analysis functions, and truly helps users to gain insight into data and discover value.

Click here – free trial.


Understand netease Cloud:

The official website of netease Cloud is www.163yun.com/

New user package: www.163yun.com/gift

Netease Cloud community: sq.163yun.com/