Hive SQL: Hive SQL: Hive SQL: Hive SQL

Hive is a data warehouse system based on Hadoop and is widely used by companies. Meituan data warehouse is also built based on Hive. It performs nearly 10,000 Hive ETL calculations every day and is responsible for storing and analyzing hundreds of GB of data every day. Hive’s stability and performance are critical to our data analysis.

During several upgrades to Hive, we encountered problems large and small. Through consultation with the community and our own efforts, we have gained a deep understanding of Hive’s process of compiling SQL into MapReduce while solving these problems. Understanding this process not only helped us solve some Hive bugs, but also helped us optimize Hive SQL, improve our control over Hive, and be able to customize the required functionality.

MapReduce implements basic SQL operations

Before explaining how SQL is compiled into MapReduce, let’s take a look at how the MapReduce framework implements basic SQL operations

Implementation principle of Join

select u.name, o.orderid from order o join user u on o.uid = u.uid;
Copy the code

Tag data in different tables in the value output of the Map, and determine the data source based on the tag in the Reduce phase. The process of MapReduce is as follows (here is only the basic implementation of Join, and there are other implementations).

Implementation principle of Group By

select rank, isonline, count(*) from city group by rank, isonline;
Copy the code

The fields of GroupBy are combined into the output key of Map, and the LastKey is saved in the Reduce phase to distinguish different keys by MapReduce sorting. The MapReduce process is as follows (of course, this only describes the non-hash aggregation process on the Reduce side)

Distinct implementation principle

select dealid, count(distinct uid) num from order group by dealid;
Copy the code

If there is only one Distinct field and Hash GroupBy in the Map phase is not considered, combine the GroupBy field and Distinct field into the Map output key and use MapReduce sorting. At the same time, the GroupBy field is used as the Reduce key, and LastKey is saved in the Reduce phase to complete deduplication

What if you have multiple Distinct fields, as shown in the SQL below

select dealid, count(distinct uid), count(distinct date) from order group by dealid;
Copy the code

There are two ways to do this:

  1. If you still use a Distinct field (as shown in the following figure) and cannot sort by UID and date separately, you will not be able to use LastKey for deduplication, and you will still need to Hash deduplication in memory during the Reduce phase

  2. In the second implementation, all Distinct fields can be numbered and n rows of data can be generated for each row. In this way, the same fields will be sorted separately. In this case, only LastKey is recorded in the Reduce phase to remove the duplicates

    This implementation method makes good use of MapReduce sorting and saves memory consumption during Reduce deduplication, but increases shuffle data volume

    Note that when a Reduce value is generated, the value fields of all Distinct data rows can be empty except for the row where the first Distinct field resides

Process of converting SQL into MapReduce

MapReduce: MapReduce: MapReduce: MapReduce: MapReduce: MapReduce: MapReduce: MapReduce

  1. Antlr defines SQL grammar rules, completes SQL morphology, syntax parsing, and transforms SQL into abstract syntax Tree AST Tree
  2. Iterate through AST Tree to abstract out the basic component unit QueryBlock
  3. Traversing QueryBlock translates to executing the OperatorTree
  4. The logical layer optimizer performs OperatorTre e transformation, merges unnecessary ReduceSinkOperator, and reduces shuffle data volume
  5. Iterating over the OperatorTree translates into MapReduce jobs
  6. The physical layer optimizer transforms MapReduce tasks to generate the final execution plan

Phase1 SQL lexical, syntax parsing

Antlr

Hive uses Antlr to implement SQL lexical and syntax parsing. Antlr is a language recognition tool that can be used to construct domain languages. Antlr is not introduced in detail here, just need to understand that using Antlr to construct a specific language only need to write a syntax file, define the morphology and syntax replacement rules, Antlr completed the process of lexical analysis, syntax analysis, semantic analysis, intermediate code generation.

Before version 0.10, the syntax rule definition file in Hive was a Hive. G file. As the syntax rule becomes more complex, the Java parsing class generated by the syntax rule may exceed the maximum number of Java class files. Syntax hivelexer. g and syntax 4 files selectClauseParser. g, fromclauseParser. g, identifiersparser. g, hiveParser. g

Abstract syntax Tree AST Tree

After lexical and grammatical analysis, if further processing is needed for expressions, Antlr’s Abstract Syntax Tree is used to convert input statements into Abstract Syntax Tree during Syntax analysis, and further processing is completed during traversing Syntax Tree

Hive SQL SelectStatement syntax SelectStatement contains clauses such as SELECT, FROM, WHERE, groupby, Having, and Order Derby. (In the following syntax rules, arrows indicate changes to the original statement. For example, TOK_QUERY identifies a query block.

selectStatement
   :
   selectClause
   fromClause
   whereClause?
   groupByClause?
   havingClause?
   orderByClause?
   clusterByClause?
   distributeByClause?
   sortByClause?
   limitClause?
 -> ^(TOK_QUERY fromClause ^(TOK_INSERT ^(TOK_DESTINATION ^(TOK_DIR TOK_TMP_FILE))
                     selectClause
 whereClause? groupByClause? havingClause? orderByClause? clusterByClause?
                     distributeByClause?
 sortByClause? limitClause?))
   ;
Copy the code

The sample SQL

To illustrate the process of translating SQL into MapReduce, here is an example of a simple SQL that contains a subquery that eventually writes data to a table

FROM
(
  SELECT
    p.datekey datekey,
    p.userid userid,
    c.clienttype
  FROM
    detail.usersequence_client c
    JOIN fact.orderpayment p ON p.orderid = c.orderid
    JOIN dim.user du ON du.userid = p.userid
  WHERE p.datekey = 20131118
) base
INSERT OVERWRITE TABLE `test`.`customer_kpi`
SELECT
  base.datekey,
  base.clienttype,
  count(distinct base.userid) buyer_count
GROUP BY base.datekey, base.clienttype
Copy the code

SQL Generates an AST Tree

HiveLexerX and HiveParser, respectively, are Antlr’s lexic and syntax parser classes automatically generated from compiling the syntax file hive. g for complex parsing

HiveLexerX lexer = new HiveLexerX(new ANTLRNoCaseStringStream(command)); TokenRewriteStream tokens = new TokenRewriteStream(lexer); if (ctx ! = null) { ctx.setTokenRewriteStream(tokens); } HiveParser parser = new HiveParser(tokens); // Parser. setTreeAdaptor(adaptor); HiveParser.statement_return r = null; try { r = parser.statement(); // convert to AST Tree} catch (RecognitionException e) {e.printStackTrace(); throw new ParseException(parser.errors); }Copy the code

The resulting AST Tree on the right side of the figure below (generated using Antlr Works, which is the editor provided by Antlr for writing syntax files) is only expanded with a few nodes of the skeleton, but not fully expanded. Subquery 1/2, corresponding to the 1/2 part on the right

Note here that the inner subquery also generates a TOK_DESTINATION node. Take a look at the syntax rules for SelectStatement above. This node is a special addition to the syntax rewrite. The reason is that all query data in Hive is stored in temporary HDFS files. The Insert statement writes the data to the HDFS directory where the table resides, whether it is the intermediate sub-query or the final query result

In detail, expanding the FROM clause of the memory subquery yields the following AST Tree, where each table generates a TOK_TABREF node and the Join condition generates a “=” node. The other SQL parts are similar and will not be detailed in detail

Phase2 SQL basic unit QueryBlock

AST Tree is still very complex and not structured enough to be translated directly into MapReduce programs. To convert AST Tree to QueryBlock is to further abstract and structure SQL

QueryBlock

QueryBlock is a basic unit of SQL, including three parts: input source, calculation process, output. A QueryBlock is simply a subquery

The following is a class diagram of QueryBlock objects in Hive.

  • QB#aliasToSubq (for the aliasToSubq attribute of the QB class) holds the QB object for the subquery. The aliasToSubq key value is the alias for the subquery
  • QB# QBP: QBParseInfo stores the AST Tree structure of the given operation part of a basic SQL unit, QBParseInfo#nameToDest this HashMap stores the output of the query unit, The key is in the form of inclause-i (multiple outputs are possible because Hive supports Multi Insert statements), and the value is the corresponding ASTNode, that is, the TOK_DESTINATION node. The remaining HashMap attributes of the QBParseInfo class hold the mapping between the output and the ASTNode nodes for each operation
  • QBParseInfo#JoinExpr saves the TOK_JOIN node. QB#QBJoinTree is the structure of the Join syntax tree
  • QB# QBM saves the meta information of each input table, such as the path of the table in HDFS and the file format of the saved table data
  • The object QBExpr is used to represent the Union operation

The AST Tree generates queryBlocks

The process of generating QueryBlock from AST Tree is a recursive process. It first traverses the AST Tree, encounters different Token nodes, and saves them in corresponding attributes. It mainly includes the following processes:

  • TOK_QUERY => Create QB objects and loop through recursive child nodes
  • TOK_FROM => saves the table name syntax portion to the TOK_INSERT => cyclic recursive child node of the QB object
  • TOK_DESTINATION => saves the syntax portion of the output target in the nameToDest attribute of the QBParseInfo object
  • TOK_SELECT => Stores the syntax part of the query expression in destToAggregationExprs and TOK_WHERE => Stores the syntax of the Where part in the destToWhereExpr property of the QBParseInfo object, respectively

The final sample SQL generates two QB objects. The relationship between QB objects is as follows: QB1 is the outer query and QB2 is the sub-query

Phase3 Logical Operator

Operator

MapReduce jobs generated by Hive consist of OperatorTrees in the Map and Reduce stages. Logical operators that perform a single specific operation in the Map or Reduce phase.

The basic operators include TableScanOperator, SelectOperator, FilterOperator, JoinOperator, GroupByOperator, and ReduceSinkOperator

You can guess the functions of operators based on their names. TableScanOperator Inputs original table data from the Map interface of the MapReduce framework and controls the number of data rows in the scanned table. The indicator obtains data from the original table. JoinOperator Completes Join; FilterOperator Completes filtering. ReduceSinkOperator serializes the combination of fields on the Map end into Reduce Key/ Value and Partition Key, which can only appear in the Map phase. It also marks the end of the Map phase of the MapReduce program generated by Hive

Data transfer between operators in the Map Reduce phase is a streaming process. After each Operator completes an operation on a row of data, the data is passed to childOperator for calculation

The main properties and methods of the Operator class are as follows:

  • RowSchema represents the output field of Operator
  • The InputObjInspector resolves the input and output fields
  • ProcessOp receives data from the parent Operator, and forward passes the processed data to the child Operator for processing
  • After each row of Hive data is processed by an Operator, fields are renumbered. ColExprMap records the mapping between the names of each expression before and after the current Operator processing, which is used to back up field names in the next logical optimization phase
  • Hive’s MapReduce program is dynamic, that is, it does not determine what operation a MapReduce Job performs, such as Join or GroupBy. Therefore, Operator stores all runtime parameters in OperatorDesc, which is serialized to HDFS before submitting a job, and read and deserialized from HDFS before executing a MapReduce job. Map phase The location of the OperatorTree on the HDFS isJob.getConf("hive.exec.plan") + "/map.xml"

QueryBlock generates an Operator Tree

QueryBlock generates the Operator Tree by iterating through the properties of the save syntax of QB and QBParseInfo objects generated in the previous procedure, including the following steps:

  • QB#aliasToSubq => there are subqueries, recursive calls
  • QB#aliasToTabs => TableScanOperator
  • QBParseInfo#joinExpr => QBJoinTree => ReduceSinkOperator + JoinOperator
  • QBParseInfo#destToWhereExpr => FilterOperator
  • QBParseInfo#destToGroupby => ReduceSinkOperator + GroupByOperator
  • QBParseInfo#destToOrderby => ReduceSinkOperator + ExtractOperator

As Join, GroupBy and OrderBy need to be completed in the Reduce stage, they all form a ReduceSinkOperator before generating operators of corresponding operations, and combine and serialize fields into Reduce Key/value. Partition Key

Next, analyze the process of generating the OperatorTree from the sample SQL in detail

The QB objects generated in the previous phase are traversed sequentially

  1. First, TableScanOperator is generated according to the child QueryBlock QB2#aliasToTabs {du=dim.user, c=detail. userSequence_client, P =fact. Orderpayment}

    TableScanOperator("dim.user") TS[0]
    TableScanOperator("detail.usersequence_client") TS[1]
    TableScanOperator("fact.orderpayment") TS[2]
    Copy the code
  2. QBParseInfo#joinExpr generates QBJoinTree. QBJoinTree is also a tree structure. QBJoinTree stores the astnodes of the left and right tables and aliases of the query

  3. Usersequence_client and Fact. Orderpayment Join operation tree

    TS=TableScanOperator RS=ReduceSinkOperator JOIN=JoinOperator

    Generate Join operation tree for intermediate table and dim.user

    Generate a FilterOperator based on QB2 QBParseInfo#destToWhereExpr. At this point, QB2 traversal is complete. In the following figure, the SelectOperator determines whether to parse a field based on certain conditions in some scenarios

    Diagram FIL = FilterOperator SEL = SelectOperator

    Generate ReduceSinkOperator + GroupByOperator according to QBParseInfo#destToGroupby of QB1

    In the figure, GBY=GroupByOperator GBY[12] is HASH aggregation, that is, aggregation operations are performed using HASH in memory

    After all parses are completed, a FileSinkOperator is generated and data is written to HDFS

    FS = FileSinkOperator figure

Phase4 logical layer optimizer

Most logical layer optimizers reduce MapReduce jobs and shuffle data by transforming OperatorTree and merging operators

The name of the role
(2) SimpleFetchOptimizer Optimize aggregated queries without GroupBy expressions
(2) MapJoinProcessor Hint MapJoin, which requires SQL to provide hints, is not used in version 0.11
(2) BucketMapJoinOptimizer BucketMapJoin
(2) GroupByOptimizer Map the aggregation
1) ReduceSinkDeDuplication Merge reduce of linear OperatorTree with the same partition/sort key
1) PredicatePushDown Predicates are pre –
1) CorrelationOptimizer Merge related jobs, hive-2206, using correlation in query
ColumnPruner Field pruning

The optimizers in table 1 are all one Job doing as many things/merges as possible. (2) Reduce shuffle data amount, or even do not Reduce

The CorrelationOptimizer is very complex and can take advantage of the Correlation in the query to merge related jobs. See Hive CorrelationOptimizer

For the sample SQL, there are two optimizers that optimize it. The roles of these two optimizers are described below, supplemented by the role of an optimizer, ReduceSinkDeDuplication

PredicatePushDown optimizer

Predicate Judgment Advance The optimizer advances the FilterOperator in the OperatorTree after the TableScanOperator

NonBlockingOpDeDupProc optimizer

The NonBlockingOpDeDupProc optimizer incorporates sel-sel or fil-fil as an Operator

ReduceSinkDeDuplication optimizer

ReduceSinkDeDuplication can combine two RS that are linearly linked. In fact, CorrelationOptimizer is a superset of ReduceSinkDeDuplication, which can combine linear and nonlinear operation RS, but Hive implements ReduceSinkDeDuplication first

For example, the following SQL statement

from (select key.value from src group by key.value) s select s.key group by s.key;
Copy the code

After the first few phases, the following OperatorTree is generated, and the two trees are connected, not drawn together here

After traversing the OperatorTree, you can find the Key values and partitionkeys output by the first and second RS as follows

The ReduceSinkDeDuplication optimizer detected that: 1. PRS Key completely contains cRS Key, and the order is consistent; 2. The pRS PartitionKey package contains the cRS PartitionKey. If the optimization condition is met, the execution plan will be optimized

ReduceSinkDeDuplication Delete childRS and operators between parentheRS and childRS, and retain RS Key as Key and value fields. PartitionKey is a key field. The merged OperatorTree is as follows:

Phase5 OperatorTree Indicates the process of generating MapReduce jobs

The process of converting an OperatorTree into a MapReduce Job is divided into the following stages

  1. Generate a MoveTask for the output table
  2. Depth-first traversal from one of the OperatorTree roots
  3. ReduceSinkOperator Identifies Map/Reduce boundaries and boundaries between multiple jobs
  4. Run through the other root nodes and merge MapReduceTask with JoinOperator
  5. Generate StatTask update metadata
  6. Cut the Operator relationship between Map and Reduce

Generate a MoveTask for the output table

Based on the OperatorTree in the previous step, only a FileSinkOperator is generated, and a MoveTask is directly generated to move the generated HDFS temporary files to the target table directory

MoveTask[Stage-0]
Move Operator
Copy the code

To traverse the

Store all the root nodes of the OperatorTree in a toWalk array, loop through the elements of the array (omit QB1, not drawn)

Add the last element, TS[p], to opStack{TS[p]}

Rule #1 TS% Generate the MapReduceTask object and determine the MapWork

Find that elements in the stack conform to the following rule R1 (represented here simply in Python code)

"".join([t + "%" for t in opStack]) == "TS%"
Copy the code

Generate a MapReduceTask[stage-1] object whose MapWork property holds a reference to the Operator root node. Because of the parent-child relationship between operatortrees, MapReduceTask[stage-1] contains all operators rooted in TS[p]

Rule #2 TS%.*RS% Identify ReduceWork

Continue traversing the sub-operator of TS[p] and store the sub-operator in opStack. When the first RS is pushed, i.e. OpStack = {TS[p], FIL[18], RS[4]}, the following rule R2 will be satisfied

"".join([t + "%" for t in opStack]) == "TS%.*RS%"
Copy the code

At this point, the reference to JOIN[5] is saved in the ReduceWork property of the MapReduceTask[stage-1] object

Rule #3 RS%.*RS% Generate a new MapReduceTask object and slice MapReduceTask

Continue iterating through the suboperators of JOIN[5], adding the suboperators to opStack

When the second RS is placed on the stack, i.e., when stack opStack = {TS[p], FIL[18], RS[4], JOIN[5], RS[6]}, the following rule R3 is satisfied

"".join([t + "%" for t inOpStack]) = = "RS %. * RS %"Loop over each suffix array in the opStack
Copy the code

Create a new MapReduceTask[stage-2] object, cut the OperatorTree between JOIN[5] and RS[6], and generate a child Operator FS[19] for JOIN[5]. RS[6] generates a TS[20], and the MapWork property of the MapReduceTask[stage-2] object holds the reference to TS[20].

The newly generated FS[19] lands intermediate data and stores it in HDFS temporary files

Continue traversing the suboperator of RS[6], storing the suboperator in opStack

When opStack = {TS[p], FIL[18], RS[4], JOIN[5], RS[6], JOIN[8], SEL[10], GBY[12], RS[13]}, R3 rule will be satisfied

In the same way, MapReduceTask[stag-3] objects are generated, and operatortrees of stag-2 and stag-3 are cut open

Rule4 FS% Connect MapReduceTask and MoveTask

After finally putting all suboperators on the stack, FIL opStack = {TS [p], [18], RS [4], the JOIN [5], the RS [6], the JOIN [8], SEL [10], GBY [12], the RS [13], GBY [14], SEL [15], FS [17]} R4 meet the rules

"".join([t + "%" for t inOpStack]) = = "FS %"Copy the code

At this point, connect MoveTask with MapReduceTask[stage-3] and generate a StatsTask to modify the meta information of the table

Consolidation Stage

We’re not done yet. We still have two root nodes left to traverse

Empty the opStack and add the second toWalk element to the stack. OpStack = {TS[du]} continues to satisfy R1 TS%, generating MapReduceTask[stage-5]

Continue traversing from TS[du] down. When opStack={TS[du], RS[7]}, R2 TS%.*RS%

At this point, save JOIN[8] as the ReduceWork of MapReduceTask[stage-5], It is found that JOIN[8] already exists in the Map

object of the relationship between Operator and MapReduceWork saved in a Map object. At this point, combine MapReduceTask[stage-2] and MapReduceTask[stage-5] into one MapReduceTask
,>

Similarly, starting from the last root node TS[c], the MapReduceTask will also be merged

Slice the Map Reduce phase

In the last stage, cut the OperatorTree in MapWork and ReduceWork with RS as the boundary

OperatorTree Generates the whole MapReduceTask

Finally, 3 MapReducetasks were generated, as shown in the following figure

Phase6 Physical layer optimizer

Instead of going into the details of each optimizer, MapJoin’s optimizer is described here

MapJoin principle

MapJoin simply means that in the Map phase, small tables are read into memory and large tables are scanned sequentially to complete the Join

Hive MapJoin is divided into two phases:

  1. The MapReduce Local Task reads small tables into the memory, generates HashTableFiles, and uploads them to the Distributed Cache. The HashTableFiles are compressed
  2. MapReduce Job In the Map phase, each Mapper reads HashTableFiles from the Distributed Cache into the memory, scans large tables in sequence, and joins directly in the Map phase. Pass the data to the next MapReduce job

ConditionalTask is generated to determine whether MapJoin is used at runtime if one of the two Join tables is a temporary table

CommonJoinResolver optimizer

The CommonJoinResolver optimizer converts a CommonJoin to a MapJoin:

  1. Depth-first traversal Task Tree
  2. Find the JoinOperator and determine the size of the left and right tables
  3. ConditionalTask => ConditionalTask; ConditionalTask => ConditionalTask

Through the MapReduce tasks generated in the previous Stage, it is found that one of the tables in MapReduceTask[stage-2] and JOIN[8] is a temporary table, and a deep copy of stage-2 is carried out first (since the original execution Plan needs to be retained as Backup Plan, So here is a copy of the execution plan), create a MapJoinOperator instead of JoinOperator, It then generates a MapReduceLocalWork that reads the small table to generate HashTableFiles to upload to DistributedCache

The execution plan of MapReduceTask after transformation is shown in the following figure

MapJoinResolver optimizer

The MapJoinResolver optimizer traverses the Task Tree and splits all the MapReduceTask with local work into two tasks

After the MapJoinResolver is processed, the following figure shows the execution plan

Hive SQL compilation process design

From the whole SQL compilation process, we can see that the design of the compilation process has several advantages worth learning and reference

  • Using Antlr open source software to define grammar rules greatly simplifies the compilation and parsing of lexical and grammar, and only requires the maintenance of a grammar file
  • The overall thinking is clear, and the phased design makes the entire compilation process easy to maintain, and makes it easy to switch on and off various optimizers in a pluggable way, such as Vectorization, the latest feature of Hive 0.13, and support for Tez engine
  • Each Operator performs a single function, simplifying the overall MapReduce program