SparkSQL is another outstanding module in spark stack. By introducing SQL support, it greatly reduces the use cost of developers and learners. It enables developers to directly use SQL to achieve big data development. Currently, all spark modules, such as SQL, SparkML, SparkGrahpx and Structed Streaming are all based on Catalyst Optimization & Tungsten Execution module. The overall architectural module design of Spark is shown in the figure below
1. Architecture design and implementation of sparkSQL
SparkSQL execution is parsed by SQL Parser, processed by Catalyst optimizer, and executed by Spark. The Catalyst process is divided into many processes, including:
-
Analysis: Mainly using Catalog information to resolve Unresolved Logical plans into Analyzed Logical plans;
-
4. Logical Optimizations: Resolves resolves to Optimized Logical plans by using rules to analyze Logical plans.
-
Physical Planning: The previous logical plan cannot be executed by Spark. The process is to convert the logical plan into multiple physical plans, and then use the cost model to select the best physical plan.
-
Code Generation: This process generates Java byte codes from SQL queries.
For example, execute the following SQL statement:
select temp1.class,sum(temp1.degree),avg(temp1.degree) from (SELECT students.sno AS ssno,students.sname,students.ssex,students.sbirthday,students.class, scores.sno,scores.degree,scores.cno FROM students LEFT JOIN scores ON students.sno = scores.sno ) temp1 group by temp1.class
Copy the code
The code implementation process is as follows:
package com.kkb.sparksql import java.util.Properties import org.apache.spark.SparkConf import Org.apache.spark.sql.{DataFrame, SparkSession} //todo: DataFromMysqlPlan {def main(args: Array[String]): Unit = {SparkConf: val SparkConf: SparkConf = new SparkConf().setAppName("DataFromMysql").setMaster("local[2]") / / sparkConf. Set (" spark. SQL. Codegen. WholeStage ", "true") / / 2, create SparkSession object val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate() spark.sparkContext.setLogLevel("WARN") / / 3, read the mysql table data / 3.1 / mysql connection url address val = "JDBC: mysql: / / localhost: 3306 / mydb? Val Student ="students" val score="scores" val Properties = New Properties() // Username properties.setProperty("user","root") // password properties.setProperty("password","123456") val studentFrame: DataFrame = spark.read.jdbc(url,student,properties) val scoreFrame: DataFrame = spark. Read. JDBC (url, score, the properties) to table studentFrame. / / the DataFrame registered createTempView (" students ") scoreFrame.createOrReplaceTempView("scores") //spark.sql("SELECT temp1.class,SUM(temp1.degree),AVG(temp1.degree) FROM (SELECT students.sno AS ssno,students.sname,students.ssex,students.sbirthday,students.class, scores.sno,scores.degree,scores.cno FROM students LEFT JOIN scores ON students.sno = scores.sno ) temp1 GROUP BY temp1.class; ").show() val resultFrame: DataFrame = spark.sql("SELECT temp1.class,SUM(temp1.degree),AVG(temp1.degree) FROM (SELECT students.sno AS ssno,students.sname,students.ssex,students.sbirthday,students.class, scores.sno,scores.degree,scores.cno FROM students LEFT JOIN scores ON students.sno = scores.sno WHERE degree > 60 AND sbirthday > '1973-01-01 00:00:00' ) temp1 GROUP BY temp1.class") resultFrame.explain(true) resultFrame.show() spark.stop() } }Copy the code
Using the Explain method to view the SQL execution plan, you get the following information
== Parsed Logical Plan == 'Aggregate ['temp1.class], ['temp1.class, unresolvedalias('SUM('temp1.degree), None), unresolvedalias('AVG('temp1.degree), None)] +- 'SubqueryAlias temp1 +- 'Project ['students.sno AS ssno#16, 'students.sname, 'students.ssex, 'students.sbirthday, 'students.class, 'scores.sno, 'scores.degree, 'scores.cno] +- 'Filter (('degree > 60) && ('sbirthday > 1973-01-01 00:00:00)) +- 'Join LeftOuter, ('students.sno = 'scores.sno) :- 'UnresolvedRelation `students` +- 'UnresolvedRelation `scores` == Analyzed Logical Plan == class: string, sum(degree): decimal(20,1), avg(degree): Decimal (14,5) Aggregate [class#4], [class#4, sum(degree#12) AS sum(degree)#27, avg(degree#12) AS avg(degree)#28] +- SubqueryAlias temp1 +- Project [sno#0 AS ssno#16, sname#1, ssex#2, sbirthday#3, class#4, sno#10, degree#12, Cno# 11] + -filter ((cast(degree#12 as decimal(10,1)) > cast(cast(60 as decimal(2,0)) as decimal(10,1)) && (cast(sbirthday#3 as string) > 1973-01-01 00:00:00)) +- Join LeftOuter, (sno#0 = sno#10) :- SubqueryAlias students : +- Relation[sno#0,sname#1,ssex#2,sbirthday#3,class#4] JDBCRelation(students) [numPartitions=1] +- SubqueryAlias scores +- Relation[sno#10,cno#11,degree#12] JDBCRelation(scores) [numPartitions=1] == Optimized Logical Plan == Aggregate [class#4], [class#4, sum(degree#12) AS sum(degree)#27, Cast ((avg(UnscaledValue(degree#12)) as decimal(14,5)) as avg(degree)#28] + -project [class#4, degree#12] +- Join Inner, (sno#0 = sno#10) :- Project [sno#0, class#4] : +- Filter ((isnotnull(sbirthday#3) && (cast(sbirthday#3 as string) > 1973-01-01 00:00:00)) && isnotnull(sno#0)) : +- Relation[sno#0,sname#1,ssex#2,sbirthday#3,class#4] JDBCRelation(students) [numPartitions=1] +- Project [sno#10, Degree# 12] + -filter ((degree#12) && (degree#12 > 60.0) &&isdegree# 10)) +- Relation[sno#10,cno#11,degree#12] JDBCRelation(scores) [numPartitions=1] == Physical Plan == *(6) HashAggregate(keys=[class#4], functions=[sum(degree#12), avg(UnscaledValue(degree#12))], output=[class#4, sum(degree)#27, avg(degree)#28]) +- Exchange hashpartitioning(class#4, 200) +- *(5) HashAggregate(keys=[class#4], functions=[partial_sum(degree#12), partial_avg(UnscaledValue(degree#12))], output=[class#4, sum#32, sum#33, count#34L]) +- *(5) Project [class#4, degree#12] +- *(5) SortMergeJoin [sno#0], [sno#10], Inner :- *(2) Sort [sno#0 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(sno#0, 200) : +- *(1) Project [sno#0, class#4] : +- *(1) Filter (cast(sbirthday#3 as string) > 1973-01-01 00:00:00) : +- *(1) Scan JDBCRelation(students) [numPartitions=1] [sno#0,class#4,sbirthday#3] PushedFilters: [*IsNotNull(sbirthday), *IsNotNull(sno)], ReadSchema: struct<sno:string,class:string,sbirthday:timestamp> +- *(4) Sort [sno#10 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(sno#10, 200) +- *(3) Scan JDBCRelation(scores) [numPartitions=1] [sno#10,degree#12] PushedFilters: [* IsNotNull (degree), * GreaterThan (60.0) degree, and * IsNotNull (sno)], ReadSchema: struct < sno: string, degree: a decimal (1, 1) >Copy the code
2. Catalyst execution process
As can be seen from the above query plan, the SQL statement we wrote is converted for many times and finally compiled into bytecode file for execution. This whole process goes through many steps, including the following important steps
-
SQL parsing phase parse
-
Generate logical plan Analyzer
-
SQL statement tuning phase Optimizer
-
Generate the physical query plan Planner
1. The SQL parsing phase is Parser
In spark2.x, to parse sparkSQL SQL statements, ==Antlr== was introduced. Antlr is a powerful syntax generator tool that can be used to read, process, execute and translate structured text or binary files. Antlr is the most widely used syntax generator tool in the Java language. It is used for common SQL parsing of big data. Including Hive, Cassandra, Phoenix, Pig and Presto. The latest version of Spark uses ==ANTLR4== to analyze SQL and build a syntax tree.
You can view the Source code of Spark on Github as follows:
Github.com/apache/spar…
View the SQL syntax supported by sparkSQL. All supported sparkSQL syntax is defined in this file. If we need to rebuild sparkSQL syntax, we just need to redefine the relevant syntax, and then use Antlr4 to parse sqlBase.g4 to generate the relevant Java class. These include the important lexical parser SQLBaselexer.java and the syntax parser SQLBaseParser.java. When we run Java above, the first step is to parse the keywords and various identifiers with SqlBaseLexer, and then build the syntax tree using SqlBaseParser.
Finally, after Lexer and parse, the syntax tree is generated. After generating the syntax tree, AstBuilder is used to convert the syntax tree into LogicalPlan, which is also called Unresolved LogicalPlan. The logical plan after parsing is as follows,
== Parsed Logical Plan ==
'Aggregate ['temp1.class], ['temp1.class, unresolvedalias('SUM('temp1.degree), None), unresolvedalias('AVG('temp1.degree), None)]
+- 'SubqueryAlias temp1
+- 'Project ['students.sno AS ssno#16, 'students.sname, 'students.ssex, 'students.sbirthday, 'students.class, 'scores.sno, 'scores.degree, 'scores.cno]
+- 'Filter (('degree > 60) && ('sbirthday > 1973-01-01 00:00:00))
+- 'Join LeftOuter, ('students.sno = 'scores.sno)
:- 'UnresolvedRelation `students`
+- 'UnresolvedRelation `scores`
Copy the code
As can be seen from the figure above, after the two tables are joined, UnresolvedRelation is generated, and the selected columns and aggregated fields are available. The first stage of SQL parsing has been completed, and then it is ready to enter the second stage
As can be seen from the figure above, after the two tables are joined, UnresolvedRelation is generated, and the selected columns and aggregated fields are available. The first stage of SQL parsing has been completed, and then it is ready to enter the second stage
2. Bind logical plan Analyzer
In the SQL parse stage, a lot of unresolvedAlias, UnresolvedRelation and other Unresolved keywords are generated, which are resolved in Unresolved LogicalPlan. Unresolved LogicalPlan is just a data structure and does not contain any data information, such as data source, data type, which table the different columns come from, etc. The Analyzer stage transforms the Unresolved LogicalPlan using the predefined Rule and SessionCatalog information. SessionCatalog is mainly used for the unified management of all kinds of == function resource information and metadata information == (database, data table, data view, data partition and function, etc.). Rules are defined in the Analyzer, and the path of the specific class is as follows:
Org. Apache. Spark. SQL) catalyst) analysis. The Analyzer specific rule rules are defined as follows: lazy val batches: Seq[Batch] = Seq( Batch("Hints", fixedPoint, new ResolveHints.ResolveBroadcastHints(conf), ResolveHints.RemoveAllHints), Batch("Simple Sanity Check", Once, LookupFunctions), Batch("Substitution", fixedPoint, CTESubstitution, WindowsSubstitution, EliminateUnions, new SubstituteUnresolvedOrdinals(conf)), Batch("Resolution", fixedPoint, ResolveTableValuedFunctions :: ResolveRelations :: ResolveReferences :: ResolveCreateNamedStruct :: ResolveDeserializer :: ResolveNewInstance :: ResolveUpCast :: ResolveGroupingAnalytics :: ResolvePivot :: ResolveOrdinalInOrderByAndGroupBy :: ResolveAggAliasInGroupBy :: ResolveMissingReferences :: ExtractGenerator :: ResolveGenerate :: ResolveFunctions :: ResolveAliases :: ResolveSubquery :: ResolveSubqueryColumnAliases :: ResolveWindowOrder :: ResolveWindowFrame :: ResolveNaturalAndUsingJoin :: ExtractWindowExpressions :: GlobalAggregates :: ResolveAggregateFunctions :: TimeWindowing :: ResolveInlineTables(conf) :: ResolveTimeZone(conf) :: ResolvedUuidExpressions :: TypeCoercion.typeCoercionRules(conf) ++ extendedResolutionRules : _*), Batch("Post-Hoc Resolution", Once, postHocResolutionRules: _*), Batch("View", Once, AliasViewChild(conf)), Batch("Nondeterministic", Once, PullOutNondeterministic), Batch("UDF", Once, HandleNullInputsForUDF), Batch("FixNullability", Once, FixNullability), Batch("Subquery", Once, UpdateOuterReferences), Batch("Cleanup", fixedPoint, CleanupAliases) )Copy the code
As can be seen from the above code, multiple rules with similar properties form a Batch. For example, the Batch named Hints above is composed of many Hints Rules. And a number of batches constitute a single Batch. These batches are executed by RuleExecutor, Batch by Batch and then by each Rule within the Batch. Each Batch is executed Once or multiple times (FixedPoint, byspark.sql.optimizer.maxIterations
Parameter decision), the execution process is as follows:
So the above SQL generated through this phase of the Analyzed Logical Plan looks like this:
== Analyzed Logical Plan == class: string, sum(degree): decimal(20,1), avg(degree): Decimal (14,5) Aggregate [class#4], [class#4, sum(degree#12) AS sum(degree)#27, avg(degree#12) AS avg(degree)#28] +- SubqueryAlias temp1 +- Project [sno#0 AS ssno#16, sname#1, ssex#2, sbirthday#3, class#4, sno#10, degree#12, Cno# 11] + -filter ((cast(degree#12 as decimal(10,1)) > cast(cast(60 as decimal(2,0)) as decimal(10,1)) && (cast(sbirthday#3 as string) > 1973-01-01 00:00:00)) +- Join LeftOuter, (sno#0 = sno#10) :- SubqueryAlias students : +- Relation[sno#0,sname#1,ssex#2,sbirthday#3,class#4] JDBCRelation(students) [numPartitions=1] +- SubqueryAlias scores +- Relation[sno#10,cno#11,degree#12] JDBCRelation(scores) [numPartitions=1]Copy the code
From the parsing process above, The students and scores tables have been parsed into specific fields with sno0 AS ssno#16, sname#1, sex#2, sbirthday#3, class#4, sno10, degrees #12, cno#11, There are also aggregate functions
Aggregate [class#4], [class#4, degree (degree) AS sum(degree)#27, degree (degree) AS avg(degree)#28], Class: string, sum(degree): decimal(20,1), avg(degree): Decimal (14,5), and we already know that the data sources are JDBCRelation(students) table and JDBCRelation(scores) table. In conclusion, Analyzed Logical plans mainly do these things
1, determine the final return field name and return type:
- Class: string, sum(degree): decimal(20,1), avg(degree): decimal(14,5)
2. Determine the aggregation function
- Aggregate [class#4], [class#4, sum(degree#12) AS sum(degree)#27, avg(degree#12) AS avg(degree)#28]
3, determine the query field from the table
- Project [sno#0 AS ssno#16, sname#1, ssex#2, sbirthday#3, class#4, sno#10, degree#12, cno#11]
4, determine the filtration conditions
Filter ((cast(degree#12 as decimal(10,1)) > cast(cast(60 as decimal(2,0)) as decimal(10,1))) && (cast(sbirthday#3 as string) > 1973-01-01 00:00:00))
5. Determine the join mode
Join LeftOuter, (sno#0 = sno#10)
6. Determine the data source and the number of partitions in the table
-
JDBCRelation(students) [numPartitions=1]
-
JDBCRelation(scores) [numPartitions=1]
So far the Analyzed Logical Plan has been completed. The process of comparing Unresolved Logical Plan to Analyzed Logical Plan is shown below
At this point the Analyzed LogicalPlan is fully generated
3, logical Optimizer stage
In the previous binding LogicalPlan stage, related transform operation of Unresolved LogicalPlan was performed to obtain the Analyzed LogicalPlan. The Analyzed Logical Plan can be directly converted into a Physical Plan and then executed in [Spark]. However, if the direct approach is made, the Physical Plan may not be optimal, because in practical application, many inefficient writing methods will cause problems of execution efficiency. Therefore, Analyzed Logical plans need to be further processed to obtain a better Logical operator tree. Thus, the Optimizer for SQL logical operator tree came into being.
Rule-based optimizers at this stage are mostly rule-based, and most of the rules are heuristic, that is, rules based on intuition or experience. Examples include == column clipping == (to filter out columns not needed by the query), == predicate pushdown == (to sink filtering as far as possible to the data source), constant accumulation (such as 1 + 2, which is calculated in advance), AND constant substitution (such as SELECT * FROM table WHERE I = 5 AND j SELECT * FROM table WHERE I = 5 AND j = 8 AND so on.
Similar to the bind logical planning phase described earlier, all rules in this phase also implement a Rule abstract class, with multiple rules forming a Batch and batches forming a Batch. These rules are also executed in RuleExecutor
The Rule execution sequence is described here.
Predicate push-down
Predicates are pushed down in SparkQL byPushDownPredicate
This process mainly pushes the filtering criteria as far down as possible, preferably to the data source. So for the SQL we have described above, the logical plan for push-down optimization using predicates is as follows:
As you can see from the graph above, the predicate pushdown pushes the Filter operator directly before the Join.
. The student table is scanned using conditional filtering to filter out the data that meets the condition; At the same time, isnotnull(id#8) && (id#8 > 50000) filtering conditions are used to filter out the data that meets the conditions when scanning the t2 table. After such operations, the amount of data processed by Join operator can be greatly reduced, thus speeding up the calculation speed.
Column cutting
Column clipping in Spark SQL is made up ofColumnPruning
The implementation. The table we query may have many fields, but we probably do not need to scan all the fields in each query. At this time, column clipping can be used to filter out those fields that are not needed in the query, reducing the amount of scanned data. So for the SQL we introduced above, using column clipping optimization results in the following logical plan:
As you can see from the above table, students only need to query sNO and class. Scores table only need to query sNO,degree field. This reduces data transfer and can greatly speed up data scanning if the underlying file format is a column store (such as Parquet).
Constant replacement
ConstantPropagation is implemented in Spark SQL. SELECT * FROM table WHERE I = 5 AND j = I + 3 can be converted to SELECT * FROM table WHERE I = 5 AND j = 8. This may seem trivial, but if the number of lines scanned is very large it can reduce the computation time overhead. After this optimization, the resulting logical plan is as follows
T1. cid = 1 AND t1.did = t1.cid + 1; t1.cid = t1.cid + 1;
Constant accumulation
Constant accumulation is implemented in Spark SQL by ConstantFolding. This is similar to constant substitution, where constant expressions are computed in advance. This seems to be a small change, but in the case of a very large amount of data can reduce a lot of computing, reduce the use of CPU and other resources. After this optimization, the resulting logical plan is as follows:
Therefore, after the optimization of the above four steps, the optimized logical plan is as follows:
== Optimized Logical Plan == Aggregate [class#4], [class#4, sum(degree#12) AS sum(degree)#27, Cast ((avg(UnscaledValue(degree#12)) as decimal(14,5)) as avg(degree)#28] + -project [class#4, degree#12] +- Join Inner, (sno#0 = sno#10) :- Project [sno#0, class#4] : +- Filter ((isnotnull(sbirthday#3) && (cast(sbirthday#3 as string) > 1973-01-01 00:00:00)) && isnotnull(sno#0)) : +- Relation[sno#0,sname#1,ssex#2,sbirthday#3,class#4] JDBCRelation(students) [numPartitions=1] +- Project [sno#10, Degree# 12] + -filter ((degree#12) && (degree#12 > 60.0) &&isdegree# 10)) +- Relation[sno#10,cno#11,degree#12] JDBCRelation(scores) [numPartitions=1]Copy the code
Github.com/apache/spar…
4. Generate an executable Physical Plan
Through the previous steps, including the parse, Analyzer, and Optimizer stages, you get an optimized SQL statement that still cannot be executed. In order to execute the SQL, you must eventually translate it into a physical plan that can be executed. At this stage spark knows how to perform the SQL Transformations. Unlike logical plan bindings and optimization, this stage uses a strategy strategy, and the tree types remain the same after logical plan bindings and Transformations. He has a quick or quick Expression; Logical plans are always Logical plans after you have made Transformations. You can change the tree type from Logical Plan to Physical Plan after a quick action.
After a series of policy processing, a Logical Plan can obtain multiple Physical Plans. Physical Plans are implemented by SparkPlan in Spark. The Selected Physical Plan is then obtained through the Cost Model. The whole process is shown as follows:
Cost Model corresponds to cost-based Optimizations (CBO), mainly implemented by Huawei executives (see Spark-16026). The core idea is to calculate the Cost of each physical plan and then get the optimal physical plan. However, this part is not implemented in Spark 2.4.3, which returns the first of multiple physical plan lists as the optimal physical plan, as shown below
lazy val sparkPlan: SparkPlan = {
SparkSession.setActiveSession(sparkSession)
// TODO: We use next(), i.e. take the first plan returned by the planner, here for now,
// but we will implement to choose the best plan.
planner.plan(ReturnAnswer(optimizedPlan)).next()
}
Copy the code
Spark-16026 introduces CBO optimization mainly in the optimizer-optimizer phase introduced above. The corresponding Rule is CostBasedJoinReorder and is disabled by default. Need to spark. SQL. The cbo. Enabled or spark. SQL. The cbo. JoinReorder. Open enabled parameter. So you get to this node, so you get to this node, and you end up with the following physical plan:
== Physical Plan == *(6) HashAggregate(keys=[class#4], functions=[sum(degree#12), avg(UnscaledValue(degree#12))], output=[class#4, sum(degree)#27, avg(degree)#28]) +- Exchange hashpartitioning(class#4, 200) +- *(5) HashAggregate(keys=[class#4], functions=[partial_sum(degree#12), partial_avg(UnscaledValue(degree#12))], output=[class#4, sum#32, sum#33, count#34L]) +- *(5) Project [class#4, degree#12] +- *(5) SortMergeJoin [sno#0], [sno#10], Inner :- *(2) Sort [sno#0 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(sno#0, 200) : +- *(1) Project [sno#0, class#4] : +- *(1) Filter (cast(sbirthday#3 as string) > 1973-01-01 00:00:00) : +- *(1) Scan JDBCRelation(students) [numPartitions=1] [sno#0,class#4,sbirthday#3] PushedFilters: [*IsNotNull(sbirthday), *IsNotNull(sno)], ReadSchema: struct<sno:string,class:string,sbirthday:timestamp> +- *(4) Sort [sno#10 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(sno#10, 200) +- *(3) Scan JDBCRelation(scores) [numPartitions=1] [sno#10,degree#12] PushedFilters: [* IsNotNull (degree), * GreaterThan (60.0) degree, and * IsNotNull (sno)], ReadSchema: struct < sno: string, degree: a decimal (1, 1) >Copy the code
As you can see from the above results, the physical planning phase already knows that the data source is read from JDBC, and also knows the file path, data type, etc. And PushedFilters are added directly to the file when it is read
Meanwhile, the Join becomes SortMergeJoin,
At this point, the Physical Plan is fully generated
5. Code generation phase
After executing the parser, Analyzer, Optimizer, physicalPlan, etc., we finally get a physical execution plan that describes the execution process at the code level. It also includes the location of the corresponding data source. Although the physical execution plan is obtained, the complete code must be generated in order to execute the physical execution plan. The bottom layer is processed based on sparkRDD. At the end of Spark, there are rules that process the generated physical execution plan. This process is prepareForExecution. These rules are defined in the
Org. Apache. Spark. SQL. Execution. QueryExecution inside the method of this class
protected def prepareForExecution(plan: SparkPlan): SparkPlan = { preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp) } } /** A sequence of rules that will be applied in order to the physical plan before execution. */ protected def preparations: [SparkPlan Seq [Rule]] = Seq (python ExtractPythonUDFs, / / draw a custom function of python PlanSubqueries (sparkSession), / / the subquery physical plan processing EnsureRequirements (sparkSession. SessionState. Conf), / / to ensure correct implementation plan partition sorting CollapseCodegenStages (sparkSession. SessionState. Conf), / / collect ReuseExchange code generation (sparkSession. SessionState. Conf), / / node reuse ReuseSubquery (sparkSession. SessionState. Conf)) / / subquery reuseCopy the code
CollapseCodegenStages is the focal point in the above Rule, which is known as full-stage code generation, and Catalyst is the entry point for full-stage code generation. , of course, if you need the Spark for code generation, the stage as a whole will need to Spark. SQL. Codegen. WholeStage is set to true (the default).
Differences between generated code and SQL parsing engine
In sparkSQL, through the generation of code, to achieve the final generation of SQL statements, to say the last bottom execution or code, so why bother to use code to execute our SQL statements, is there no SQL parsing engine directly execute SQL statements? Of course there is, Before Spark2.0, the Volcano Iterator Model (see Volcano-An Extensible and Parallel Query Evaluation System) was used to realize SQL parsing. This was introduced by Goetz Graefe in 1993, and most database systems today process SQL based on this model at the bottom. The implementation of this model can be summarized as follows: the database engine first translates SQL into a series of relational algebraic operators or expressions, and then relies on these relational algebraic operators to process the input data piece by piece and produce results. Each operator realizes the same interface at the bottom level, for example, next() is implemented. Then the top-level operator next() calls next() of the sub-operator, and next() of the sub-operator calls next() of the sub-operator until next() at the bottom level. The specific process is shown in the figure below:
The advantage of the Volcano Iterator Model is that it is simple to abstract, easy to implement, and can express complex queries with any combination of operators. However, the disadvantages are also obvious, there are a large number of == virtual function calls ==, which will cause CPU interruption, and ultimately affect the execution efficiency. Databricks’ official blog has compared the Volcano Iterator Model to handwritten code and found that handwritten code is 10 times more efficient.
To sum up, parsing SQL into code is faster than SQL engine parsing SQL statements directly, so Spark2.0 finally uses code generation to execute SQL statements
Based on the above findings, starting with Apache Spark 2.0, the community introduced whole-stage Code Generation (see Spark-12795) to simulate handwritten Code and improve Spark SQL execution efficiency. Whole-stage Code Generation is from the work Efficiently Compiling Efficient Query Plans for Modern published in 2011 by Thomas Neumann Hardware paper, this was part of the Tungsten project.
The Tungsten code generation is divided into three parts:
-
Expression codeGen
-
Whole-stage Code Generation
-
Accelerate the serialization and deserialization (speed up serialization/deserialization)
Expression codeGen
This is actually available on Spark 1.x. Expression is the base class for code generation org. Apache. Spark. SQL. Catalyst. The expressions. Codegen. CodeGenerator, which has seven subclasses:
In our previous SQL generated logical plan (isnotnull(sbirthday#3) && (cast(sbirthday#3 as string) > 1973-01-01 00:00:00) is the most basic expression. It is also a kind of Predicate, so will call org. Apache. Spark. SQL. Catalyst. Expressions. The codegen. GeneratePredicate code to generate expression.
Whole-stage Code Generation
Whole-stage Code Generation, which integrates multiple processing logic into a single Code module, also uses expression Code Generation. Unlike the expression code generation described earlier, which was only for expressions, this is code generation for the entire SQL procedure. Code generation are inherited from the stage as a whole org. Apache. The spark.. SQL execution. BufferedRowIterator, the generated code to implement processNext () method, This method can be in org. Apache. Spark.. SQL execution. WholeStageCodegenExec inside the doExecute method is invoked. The RDD in this method will pass the data into the generated code. For example, in the SQL example, the data source is a JDBC file. The underlying use org. Apache. Spark. SQL. Execution. RowDataSourceScanExec this class read the file, then generate inputRDD, This RDD calls the generated code in the doExecute method of the WholeStageCodegenExec class, and then performs our various judgments to get the final result. Part of the code of doExecute method in WholeStageCodegenExec class is as follows:
/** * WholeStageCodegen compiles a subtree of plans that support codegen together into single Java * function. * * Here is the call graph of to generate Java source (plan A supports codegen, but plan B does not): * * WholeStageCodegen Plan A FakeInput Plan B * ========================================================================= * * -> execute() * | * doExecute() ---------> inputRDDs() -------> inputRDDs() ------> execute() * | * +-----------------> produce() * | * doProduce() -------> produce() * | * doProduce() * | * doConsume() <--------- consume() * | * doConsume() <-------- consume() * * SparkPlan A should override `doProduce()` and `doConsume()`. * * `doCodeGen()` will create a `CodeGenContext`, which will hold a list of variables for input, * used to generated code for [[BoundReference]]. */ override def doExecute(): RDD[InternalRow] = { val (ctx, cleanedSource) = doCodeGen() // try to compile and fallback if it failed val (_, maxCodeSize) = try { CodeGenerator.compile(cleanedSource) } catch { case _: Exception if ! Utils.isTesting && sqlContext.conf.codegenFallback => // We should already saw the error message logWarning(s"Whole-stage codegen disabled for plan (id=$codegenStageId):\n $treeString") return child.execute() } // Check if compiled code has a too large function if (maxCodeSize > sqlContext.conf.hugeMethodLimit) { logInfo(s"Found too long generated codes and JIT optimization might not work: " + s"the bytecode size ($maxCodeSize) is above the limit " + s"${sqlContext.conf.hugeMethodLimit}, and the whole-stage codegen was disabled " + s"for this plan (id=$codegenStageId). To avoid this, you can raise the limit " + s"`${SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key}`:\n$treeString") child match { // The fallback solution of batch file source scan still uses WholeStageCodegenExec case f: FileSourceScanExec if f.supportsBatch => // do nothing case _ => return child.execute() } } val references = ctx.references.toArray val durationMs = longMetric("pipelineTime") val rdds = child.asInstanceOf[CodegenSupport].inputRDDs() assert(rdds.size <= 2, "Up to two input RDDs can be supported") if (rdds.length == 1) { rdds.head.mapPartitionsWithIndex { (index, iter) => val (clazz, _) = CodeGenerator.compile(cleanedSource) val buffer = clazz.generate(references).asInstanceOf[BufferedRowIterator] buffer.init(index, Array(iter)) new Iterator[InternalRow] { override def hasNext: Boolean = { val v = buffer.hasNext if (! v) durationMs += buffer.durationMs() v } override def next: InternalRow = buffer.next() } } } else { // Right now, we support up to two input RDDs. rdds.head.zipPartitions(rdds(1)) { (leftIter, rightIter) => Iterator((leftIter, rightIter)) // a small hack to obtain the correct partition index }.mapPartitionsWithIndex { (index, zippedIter) => val (leftIter, rightIter) = zippedIter.next() val (clazz, _) = CodeGenerator.compile(cleanedSource) val buffer = clazz.generate(references).asInstanceOf[BufferedRowIterator] buffer.init(index, Array(leftIter, rightIter)) new Iterator[InternalRow] { override def hasNext: Boolean = { val v = buffer.hasNext if (! v) durationMs += buffer.durationMs() v } override def next: InternalRow = buffer.next() } } } }Copy the code
It is also explained in the comments of WholeStageCodegenExec class that the final generated code process is as follows
/** * WholeStageCodegen compiles a subtree of plans that support codegen together into single Java * function. * * Here is the call graph of to generate Java source (plan A supports codegen, but plan B does not): * * WholeStageCodegen Plan A FakeInput Plan B * ========================================================================= * * -> execute() * | * doExecute() ---------> inputRDDs() -------> inputRDDs() ------> execute() * | * +-----------------> produce() * | * doProduce() -------> produce() * | * doProduce() * | * doConsume() <--------- consume() * | * doConsume() <-------- consume() * * SparkPlan A should override `doProduce()` and `doConsume()`. * * `doCodeGen()` will create a `CodeGenContext`, which will hold a list of variables for input, * used to generated code for [[BoundReference]]. */Copy the code
Compared with the Volcano Iterator Model, the execution process of full-stage code generation is as follows:
By introducing full-stage code generation, the virtual function calls are greatly reduced, and the CPU calls are reduced, which greatly improves the execution speed of SQL.
The code to compile
Another problem that needs to be solved after code generation is how to compile and load the generated code into the same JVM. In earlier versions of Spark, code generation was implemented using Scala’s Reflection and Quasiquotes mechanism. Quasiquotes is a compact notation that makes it easy to manipulate the Scala syntax tree, as shown here. While Quasiquotes did a great job of solving code generation problems, the new problem is that it takes a long time to compile the code (around 50ms-500ms)! So the community had to turn expression code generation off by default.
To solve this problem, Spark introduced the Janino project, see Spark-7956. Janino is a super small and super fast Java™ compiler. It not only compiles a set of source files into bytecode files, as javac tools do, but also compiles Java expressions, code blocks, text (class body) in a class, or in-memory source files, and loads the compiled bytecode directly into the same JVM. Janino is not a development tool, but rather an embedded compiler at run time, such as a translator for expression evaluation or a jSP-like server-side page engine. See here for more information about Janino. By introducing Janino to compile the generated code, the result shows that the compile time for SQL expressions is reduced to 5ms. Used in Spark ClassBodyEvaluator to compile the generated code, after see org. Apache. Spark. SQL. Catalyst. Expressions. The codegen. CodeGenerator.
The main thing is that code generation is done on the Driver side, while code compilation is done on the Executor side.
SQL execution
Finally, here’s where the SQL actually executes. At this point Spark will execute the code generated in the previous phase and get the final result. The DAG execution diagram is as follows:
3. SparkSQL execution process in-depth summary
As you can see from the above, sparkSQL execution mainly goes through these big steps
1. Enter SQL, dataFrame, or dataSet
2. Through the Catalyst process, the optimal physical execution plan is generated
1. Parser phase
G4 is used to parse the SqlBase. G4 file using Antlr4. All spark syntax is defined in SQLBase.g4. Our syntax parser SQLBaselexer.java and lexical parser SQLBaseParser.java are generated
Parse phase == “antlr4 ==” parse == “SqlBase. G4 ==” get == “syntax parser SQLBaselexer. Java + lexical parser SQLBaseParser.java
2. Analyzer stage
Function resource information and metadata management information are implemented using rule-based Rule resolution and the Session Catalog
Analyzer phase == uses == Rule + Session Catalog == Multiple Rule == to compose a batch
Session CataLog == stores function resource information and metadata information
3. Optimizer stage
Rule-based Optimizer ==> Predicates push-down + column pruning + constant replacement + constant accumulation
4. Planner stage
Multiple physical plans are generated through the Analyzer == “Cost Model is used for optimal selection ==” Cost-based CBO optimization == “the optimal physical execution plan is finally selected
5. Select the final physical plan and prepare to execute it
Finally, the selected optimal physical execution plan == is ready to generate code to begin execution
3. Code generation for the physical execution plan finally obtained, and submit the code to execute our final task
SparkSQL tuning
1. Data caching
Performance tuning mainly involves putting data into memory operations, which Spark uses to cache the registry
Cache spark table:
The spark. Catalog. CacheTable cache table (” tableName “)
Release cache table:
Spark. The catalog. UncacheTable (” tableName “) to remove the cache
2. Parameters related to performance optimization
Sparksql caches only the necessary columns and automatically adjusts the compression algorithm to reduce memory and GC stress.
attribute | The default value | describe |
---|---|---|
spark.sql.inMemoryColumnarStorage.compressed | true | Spark SQL will automatically select a compression encoding method for each column based on the statistics. |
spark.sql.inMemoryColumnarStorage.batchSize | 10000 | Cache batch size. When caching data, large batch sizes can improve Memory utilization and compression, but also bring OOM (Out Of Memory) risks. |
spark.sql.files.maxPartitionBytes | 128 MB | Maximum number of bytes that a partition can hold when reading a file (manual modification is not recommended though it may be automatically adapted in later versions) |
spark.sql.files.openCostInBytes | 4M | The estimated cost of opening a file, measured in terms of the number of bytes that can be scanned at one time. Used when writing multiple files to a partition. Overestimate is better, so small partitions will be faster (scheduled first) than large ones. |
3. Table data broadcast
When performing a table join, broadcasting small tables can improve performance. The following parameters can be adjusted in SPARk2.
attribute | The default value | describe |
---|---|---|
spark.sql.broadcastTimeout | 300 | Broadcast wait timeout, in seconds |
spark.sql.autoBroadcastJoinThreshold | 10M | Used to configure the maximum byte size that a table can broadcast to all worker nodes when performing a join. You can disable broadcasting by setting this value to -1. Note The current data STATISTICS only support the Hive Metastore TABLE that has run the ANALYZE TABLE COMPUTE STATISTICS noscan command. |
4. Control of partition number
You can set two parameters for spark task parallelism
attribute | The default value | describe |
---|---|---|
spark.sql.shuffle.partitions | 200 | This parameter is used to configure the number of partitions for Join or Aggregate Shuffle data. |
spark.default.parallelism | For distributed shuffle operations like reduceByKey and Join, the maximum number of partitions in the parent RDD. Local mode: the number of cores on the local computer. Mesos fine Grained mode: 8. Other: the total number of cores on all executing nodes or 2, whichever is greater | Number of partitions for distributed shuffle operations |
It might look like they have similar definitions, but in practice,
-
The spark. Default. Parallelism will take action only when processing RDD, the spark of SQL is invalid.
-
Spark. SQL. Shuffle. The partitions are set of sparks SQL is special
5. Files and partitions
There are two parameters that can be adjusted:
-
How much data a partition receives while reading a file;
-
The cost of opening a file is popularly known as the threshold for merging small files.
Opening a file costs money. Spark uses a better way to measure the cost of opening a file by the number of bytes of data that can be scanned at the same time.
The parameters are described as follows:
attribute | The default value | describe |
---|---|---|
spark.sql.files.maxPartitionBytes | 134217728 (128 MB) | Package the maximum bytes passed into a partition while reading the file |
spark.sql.files.openCostInBytes | 4194304 (4 MB) | Measure the cost of opening a file in terms of the size of the data that can be scanned at the same time. This parameter is useful when multiple files are written to the same partition. It is good to set this value to a larger value, as partitions with small files will be processed faster than partitions with large files (priority scheduling) |
Spark, SQL files. MaxPartitionBytes the value adjustment should combine concurrency and memory size you want.
Spark. SQL. Files openCostInBytes said bluntly some threshold value of this parameter is a small file, the file will merge is less than the threshold
6. Local data
The essence of distributed computing is moving computing rather than moving data, but in the actual computing process, there is always moving data, unless a copy of the data is kept on all nodes of the cluster. Moving data from one node to another consumes both NETWORK AND disk I/OS, reducing the computing efficiency. In order to improve the locality of the data, in addition to optimizing the algorithm (i.e. modifying Spark memory, which is a bit more difficult), it is necessary to set up a proper copy of the data. Set up a copy of the data, which requires an empirical value to be obtained by configuring the parameters and observing the health over time.
Perform all tasks on a stage.
-
Scheduler Delay: Indicates the time spent by Spark to allocate tasks
-
Executor Computing Time: The amount of Time an Executor spends executing a task
-
Getting Result Time: the Time it takes to get the Result of a task execution
-
Result Serialization Time: Serialization Time of task execution results
-
Task Deserialization Time: Task Deserialization Time
-
Shuffle Write Time: indicates the Shuffle data Write Time
-
Shuffle Read Time: Shuffle Indicates the Time taken to Read data
The following is a diagram of the Spark webUI monitoring Stage:
-
PROCESS_LOCAL reads the data cached on the local node
-
NODE_LOCAL reads data from disks on local nodes
-
ANY indicates that data is read from non-local nodes
-
PROCESS_LOCAL>NODE_LOCAL>ANY usually reads the data in PROCESS_LOCAL or NODE_LOCAL mode as much as possible. PROCESS_LOCAL is also related to cache.