In this paper, starting from vivo Internet technology WeChat public mp.weixin.qq.com/s/YPN85WBNc number…

Author: Li Yong

Directory:

1.SparkSql

2. Connection query and connection conditions

3. Predicate push down

4. Predicate pushdown rules in inner join queries

4.1. Post-join conditions are joined by AND

4.2. After Join, the condition is connected through OR

4.3. Partition tables use OR join filtering criteria

1.SparkSql

SparkSql is a distributed Sql engine based on Spark computing framework. It uses DataFrame and DataSet to carry structured and semi-structured data to realize complex data query processing. The PROVIDED DSL can directly use Scala language to complete Sql query. In addition, the THRIFtServer is used to provide service-based Sql query functions.

SparkSql provides a DataSource API. Users can develop a Connector through this API to directly query various data sources, including NoSql, RDBMS, search engine, and HDFS files on distributed file systems. Similar systems to SparkSql include Hive, PrestoDB, and Impala, all of which are so-called “Sql on Hadoop” systems. Each of these systems is very popular. After all, in this era of Sql or hotheads, it’s really hard to find users without Sql.

2. Connection query and connection conditions

Join queries in Sql are divided into inner join queries, outter join queries and semi join queries. For details, please refer to Wiki.

Join condition, when this condition is met, two rows of two tables can be returned “join”, for example:

Lt. id= rt. idAND Lt. id>1 Lt. id= rt. idAND Lt. id>1 Lt. id= rt. idAND Lt. id>1 Lt. id= rt. idAND Lt. id>1 Instead, it is handled differently depending on the type of join query, so this is not a single-table filtering process or a “joint filtering” process of two tables; Select * from ‘rt. id>2’ where ‘rt. id>2’; select * from ‘rt. id>2’ where ‘rt. id>2’; select * from ‘rt. id>2’; This is also the benchmark for getting the right answer when we analyze the problem later.

3. Predicate push down

These are called predicates: Predicate is A function that returns bool (or something that can be declared to bool), A function that returns either true or false. Those of you who have used Scala or Spark know the filter method, a higher-order function that takes a parameter that returns either true or false.

But in SQL, there are no methods, only expressions. The expression after where acts as a filter, and this part of the statement is parsed by the SQL layer and rendered as a predicate inside the database.

So the question is, why does the predicate push down? Predicate push-down in SparkSql has two meanings, the first meaning referring to who does the data filtering and the second meaning referring to when the data filtering is done. To answer these two questions, we need to understand the Sql statement processing logic of SparkSql. Roughly, the query processing flow in SparkSql can be divided as follows:

SparkSql starts with a series of analyses of incoming Sql statements, This includes lexical analysis (which can be understood as the process of word segmentation in search engines), syntactic analysis, and semantic analysis (such as rules to determine whether database or table exists, and group by must be combined with aggregation functions). This is followed by the generation of the execution plan, both logical and physical. There is a lot of optimization in the logical planning stage, where the processing of predicates is done; The physical plan is the generation process of THE DAG diagram of RDD. After these two steps are complete, the concrete execution (i.e., the various heavyweight calculation logic such as JOIN, groupby, filter, and DISTINCT, etc.) is followed by a variety of physical operators (RDD transformations).

There are two principals that can perform data filtering, the first is the distributed Sql layer (in the EXECUTE phase), and the second is the data source. So the first layer of predicate push-down means whether the Filter is done by the Filter operator at the Sql layer or by the Scan operator during the Scan phase.

As mentioned above, we can use the Data Source API that encapsulates SparkSql to complete the query of various Data sources. If the underlying Data Source cannot efficiently Filter the Data, we will perform a global scan, and send each relevant Data to the SparkSql Filter operator for filtering. SparkSql uses Code Generation technology to greatly improve the efficiency of data filtering, but this process cannot avoid disk reading of a large amount of data, and even involves network IO in some cases (such as data non-localized storage). If the underlying data source while scanning can very quickly complete data filtering, so will leave the filter in the underlying data source to complete (as to what data sources can complete data filtering and efficient SparkSql is how to achieve efficient data filtering is not the focus of this article, the other series of articles).

Therefore, the second meaning of predicate push-down, that is, when to complete data filtering, generally refers to the join query, whether to filter the data of a single table first and then join with other tables or to filter the temporary table after joining with multiple tables first, is the focus of this series of articles to analyze and discuss.

4. Predicate pushdown rules in inner join queries

Suppose we have two tables, both of which have a very simple structure and only two data, but are sufficient to illustrate our push-down rules: one lefttable and one righttable:

4.1. Post-join conditions are joined by AND

Let’s start with a query statement:

This query is an inner join query, and the condition after join is the filter condition of the two tables joined by and. If we do not push, but do inner join judgment first, then we can get the correct result, the steps are as follows:

  1. The row with id 1 in the left table is found in the right table, i.e. the two rows can be “joined” together

  2. The row with ID 2 in the left table can be found in the right table, and the two rows can also be “joined” together

At this point, the join’s temporary result table (temporary because it has not been filtered) looks like this:

Select * from temporary table where (select * from temporary table where (select * from temporary table where (select * from temporary table where (select * from temporary table where));

Let’s look at predicates that push down first. Filter the two tables first, and the filtering results are as follows:

Then inner join processing is performed on the two filtered tables, and the results are as follows:

It can be seen that this is consistent with the result obtained by join first and then filtering.

4.2. After Join, the condition is connected through OR

Let’s look at another query statement:

Let’s start with join processing, and the result of temporary table is as follows:

Then use the WHERE condition to filter, and the final query result is as follows:

If we use the WHERE condition first and then the respective filter condition for each table, the two tables will be filtered as follows:

Insert temporary tables into temporary tables; insert temporary tables into temporary tables

There is a problem with the table, only the field name, no field value, why? Yes, you read that correctly, there is no value, because the left table filter only has the id of 1, the right table filter only has the id of 2, the two rows cannot be joined, so there is no result.

Select * from ‘where’ where (select * from ‘where’) where (select * from ‘where’) Lt. value = ‘two’ or rt. value = ‘two’ Lt. value=’two’; Lt. value=’two’; Lt. value=’two’; Lt. value = ‘two’ OR rt. value = ‘two’ “; So in this case the predicate cannot be pushed down.

However, there are also two exceptions to the condition after OR joins two tables. The first exception is analyzed here. The first exception is when the filter condition field happens to be a Join field, such as the following query:

In this query, the filter condition after join is still used to connect the two tables with OR. The difference is that the condition in join is no longer id equality, but value field equality, that is to say, the filter condition field is exactly the join condition field. You can use the step-down method above to analyze the query results when the predicate is pushed down and down, and the results are the same.

Let’s see if the same thing that happens when you can’t push up can happen in this query. Join (Lt. value=’two’); join (Lt. value=’two’); Otherwise, LT. Value = rt. value is not satisfied. In fact, there is a condition passing process. By joining the conditions, the two tables have been logically merged into one table in advance.

The second exception, which involves an optimization in SparkSql, needs to be covered separately.

4.3. Partition tables use OR join filtering criteria

What happens if both tables are partitioned tables? Let’s start with the following query:

The left and right tables are no longer normal tables, but partitioned tables. The partitioned fields are PT, and the data is partitioned by date. At the same time, the query conditions of the two tables are still connected by OR. Just imagine, if the two tables can not be filtered in advance, then there will be a huge amount of data to be joined first, which is very expensive. But what if, according to our analysis in 2, we use OR to join the filter conditions of two tables, but cannot arbitrarily push down the predicate? SparkSql uses an optimization technique called “partition clipping”, which does not treat partitions as ordinary filtering criteria. Instead, it uses a “one-size-fits-all” method to exclude directories that do not meet the query partitioning criteria from the directories to be scanned.

The partition table stores data in a partition by directory on the HDFS. Therefore, during partition clipping, the HDFS directory to be scanned is directly notified to Spark’s Scan operator. In this way, Spark can click data in other partitions when scanning. However, in order to complete this optimization, SparkSql semantic analysis logic can correctly analyze the precise purpose of THE Sql statement to express, so the partitioned fields in SparkSql metadata are also independent of other common fields, marked separately. To make it easier for semantic analysis logic to handle special cases of where conditions in Sql statements.

For more content, please pay attention to vivo Internet technology wechat public account

Note: To reprint the article, please contact our wechat account: LABs2020.