In the previous TiDB source code reading series (4), we introduced the Insert statement. You already know how TiDB writes data. In this article, we introduce how the Select statement is executed. The process of executing Select statements is more complex than that of Insert, and this article is the first introduction to the optimizer and Coprocessor module.
Table structures and statements
The table structure is the same as in the previous article:
CREATE TABLE t {
id VARCHAR(31),
name VARCHAR(50),
age int.key id_idx (id)};Copy the code
The Select statement only covers the simplest cases: full table scan + filtering, leaving aside indexes and other complex cases, which will be covered in the following sections. Statement is:
SELECT name FROM t WHERE age > 10;
Copy the code
Statement processing flow
There are three obvious differences between the Select process and the Insert process:
-
You have to go through Optimize
Insert is a relatively simple statement that doesn’t do much with the query plan (in the case of Insert into Select, it’s really only optimized for Select), and Select statements can be incredibly complex, with performance varying dramatically from one query plan to another. It needs to be optimized very carefully.
-
You need to interact with the computing module in the storage engine
The Insert statement only involves the Set operation of key-value. The Select statement may need to query a large amount of data. If the storage engine is operated through the KV interface, it will be inefficient.
-
Result set data needs to be returned to the client
An Insert statement only needs to return whether it was successful and how many rows were inserted, whereas a Select statement needs to return a result set.
This article will highlight these differences, and the same steps will be minimized.
Parsing
The rules for parsing Select statements are here. Insert statements are much more complex than Insert statements, and you can check out the MySQL documentation for details of the parsing implementation. Of particular interest is the From field, which can be very complex and whose syntactic definition is recursive.
The final statement is parsed into the ast.selectstmt structure:
type SelectStmt struct {
dmlNode
resultSetNode
// SelectStmtOpts wraps around select hints and switches.
*SelectStmtOpts
// Distinct represents whether the select has distinct option.
Distinct bool
// From is the from clause of the query.
From *TableRefsClause
// Where is the where clause in select statement.
Where ExprNode
// Fields is the select expression list.
Fields *FieldList
// GroupBy is the group by expression list.
GroupBy *GroupByClause
// Having is the having condition.
Having *HavingClause
// OrderBy is the ordering expression list.
OrderBy *OrderByClause
// Limit is the limit clause.
Limit *Limit
// LockTp is the lock type
LockTp SelectLockType
// TableHints represents the level Optimizer Hint
TableHints [](#)*TableOptimizerHint
}
Copy the code
SELECT name FROM t WHERE age > 10; Name is resolved as Fields, WHERE age > 10 is resolved as a WHERE field, and FROM t is resolved as a FROM field.
Planning
In the planBuilder.buildSelect() method, we can see how ast.selectstmt is converted to a plan tree. The end result is a LogicalPlan, where each syntactic element is converted to a logical query plan unit. For example, WHERE C > 10 is processed as a plan.LogicalSelection structure:
ifsel.Where ! =nil {
p = b.buildSelection(p, sel.Where, nil)
ifb.err ! =nil
return nil}}Copy the code
The specific structure is as follows:
// LogicalSelection represents a where or having predicate.
type LogicalSelection struct {
baseLogicalPlan
// Originally the WHERE or ON condition is parsed into a single expression,
// but after we converted to CNF(Conjunctive normal form), it can be
// split into a list of AND conditions.
Conditions []expression.Expression
}
Copy the code
The most important of these is the Conditions field, which represents the expression that the Where statement evaluates to. When this expression evaluates to True, it indicates that the row is eligible.
After buildSelect(), the AST becomes a tree-like structure of the Plan, which will be optimized in the next step.
Optimizing
Let’s go back to the plan.optimize () function. The Select statement gives us a LogicalPlan, so we can enter the doOptimize function, which is a short function that looks like this:
func doOptimize(flag uint64, logic LogicalPlan) (PhysicalPlan, error) {
logic, err := logicalOptimize(flag, logic)
iferr ! =nil {
return nil, errors.Trace(err)
}
if! AllowCartesianProduct && existsCartesianProduct(logic) {return nil, errors.Trace(ErrCartesianProductUnsupported)
}
physical, err := dagPhysicalOptimize(logic)
iferr ! =nil {
return nil, errors.Trace(err)
}
finalPlan := eliminatePhysicalProjection(physical)
return finalPlan, nil
}
Copy the code
LogicalOptimize and dagPhysicalOptimize stand for logical optimization and physical optimization respectively. The basic concepts and differences between logicalOptimize and dagPhysicalOptimize will not be described in this article. Here’s what each of these functions does.
Logic optimization
Logical optimization consists of a series of optimization rules that are sequentially applied to the passed LogicalPlan Tree, see the logicalOptimize() function:
func logicalOptimize(flag uint64, logic LogicalPlan) (LogicalPlan, error) {
var err error
for i, rule := range optRuleList {
// The order of flags is same as the order of optRule in the list.
// We use a bitmask to record which opt rules should be used. If the i-th bit is 1, it means we should
// apply i-th optimizing rule.
if flag&(1<<uint(i)) == 0 {
continue
}
logic, err = rule.optimize(logic)
iferr ! =nil {
return nil, errors.Trace(err)
}
}
return logic, errors.Trace(err)
}
Copy the code
TiDB already supports the following optimization rules:
var optRuleList = []logicalOptRule{
&columnPruner{},
&maxMinEliminator{},
&projectionEliminater{},
&buildKeySolver{},
&decorrelateSolver{},
&ppdSolver{},
&aggregationOptimizer{},
&pushDownTopNOptimizer{},
}
Copy the code
These rules do not take into account the distribution of data, but simply manipulate the Plan tree mindlessly, because most rules, when applied, are bound to result in a better Plan (although one rule above is not necessarily better, so you can think which one is better).
Here is a selection of rules to introduce, the rest of the optimization rules for readers to research or wait until the next article.
SQL: select c from t; SQL: select c from t; For the full-table scan operator from T (or possibly index scan), it only needs to return the data of column C externally, which is achieved by the rule of column clipping. The whole Plan tree recursively calls this rule from root to leaf nodes, and each layer node only retains the columns required by the upper node.
After logical optimization, we can get a query plan like this:
WHERE FROM t becomes DataSource operator, WHERE age > 10 becomes Selection operator, so here’s a question: WHERE is the column Selection in SELECT name?
Physical optimization
In the physical optimization phase, we consider the distribution of data and decide how to choose the physical operator. For example, for the statement FROM t WHERE age > 10, assuming that there is an index on the age field, You need to consider whether TableScan + Filter or IndexScan is faster. The choice depends on statistics, that is, how much data can be filtered if age > 10.
Let’s look at the dagPhysicalOptimize function:
func dagPhysicalOptimize(logic LogicalPlan) (PhysicalPlan, error) {
logic.preparePossibleProperties()
logic.deriveStats()
t, err := logic.convert2PhysicalPlan(&requiredProp{taskTp: rootTaskType, expectedCnt: math.MaxFloat64})
iferr ! =nil {
return nil, errors.Trace(err)
}
p := t.plan()
p.ResolveIndices()
return p, nil
}
Copy the code
Convert2PhysicalPlan recursively calls the convert2PhysicalPlan method on the underlying node, generates the physical operator, estimates its cost, and selects the least expensive option. These two functions are important:
// convert2PhysicalPlan implements LogicalPlan interface.
func (p *baseLogicalPlan) convert2PhysicalPlan(prop *requiredProp) (t task, err error) {
// Look up the task with this prop in the task map.
// It's used to reduce double counting.
t = p.getTask(prop)
ift ! =nil {
return t, nil
}
t = invalidTask
ifprop.taskTp ! = rootTaskType {// Currently all plan cannot totally push down.
p.storeTask(prop, t)
return t, nil
}
for _, pp := range p.self.genPhysPlansByReqProp(prop) {
t, err = p.getBestTask(t, pp)
iferr ! =nil {
return nil, errors.Trace(err)
}
}
p.storeTask(prop, t)
return t, nil
}
func (p *baseLogicalPlan) getBestTask(bestTask task, pp PhysicalPlan) (task, error) {
tasks := make([]task, 0.len(p.children))
for i, child := range p.children {
childTask, err := child.convert2PhysicalPlan(pp.getChildReqProps(i))
iferr ! =nil {
return nil, errors.Trace(err)
}
tasks = append(tasks, childTask)
}
resultTask := pp.attach2Task(tasks...)
if resultTask.cost() < bestTask.cost() {
bestTask = resultTask
}
return bestTask, nil
}
Copy the code
The return value of both methods is a structure called task, not a PhysicalPlan. To introduce a concept called task, TiDB’s optimizer packages PhysicalPlan as a task. The definition of a Task is in task.go. Let’s look at the comment:
// task is a new version of `PhysicalPlanInfo`. It stores cost information for a task.
// A task may be CopTask, RootTask, MPPTask or a ParallelTask.
type task interface {
count() float64
addCost(cost float64)
cost() float64
copy() task
plan() PhysicalPlan
invalid() bool
}
Copy the code
In TiDB, tasks are defined as a series of operations that can be performed on a single node without relying on data exchange with other nodes. Currently, only two tasks are implemented:
-
A CopTask is a physical plan that needs to be pushed down to a storage engine (TiKV) for computation, and each TiKV node that receives the request does the same
-
RootTask is the part of the physical plan that remains in TiDB for computation
If you look at TiDB’s Explain results, you can see that each Operator indicates which Task it belongs to, as in the following example:
The whole process is a tree dynamic programming algorithm, you can be interested in the relevant code to study or wait for the subsequent article.
SELECT name FROM t WHERE age > 10; SELECT name FROM t WHERE age > 10; The query plan that the statement can specify looks something like this:
The reader may wonder, why is there only one physical operator left? Where is WHERR age > 10? In fact, the filtering condition age > 10 is merged into PhysicalTableScan. Because age > 10 can be pushed down to TiKV for calculation, TableScan and Filter operations are combined. Which expressions are pushed down to the Coprocessor module on TiKV for evaluation? For this Query it is identified in the following place:
// PredicatePushDown implements LogicalPlan PredicatePushDown interface.
func (ds *DataSource) PredicatePushDown(predicates []expression.Expression) ([]expression.Expression, LogicalPlan) {
_, ds.pushedDownConds, predicates = expression.ExpressionsToPB(ds.ctx.GetSessionVars().StmtCtx, predicates, ds.ctx.GetClient())
return predicates, ds
}
Copy the code
ExpressionsToPB in expression. This method, will push to the TiKV expression on the identified (TiKV haven’t implement all expressions, especially the built-in function only part), In the DataSource. PushedDownConds field. Let’s have a look at how the DataSource into PhysicalTableScan, see the DataSource. ConvertToTableScan () method. This method builds PhysicalTableScan and calls addPushDownSelection() to add a PhysicalSelection to PhysicalTableScan and place it in a copTask.
This query plan is a very simple plan, but we can use it to illustrate how TiDB performs the query operation.
Executing
How a query plan becomes an executable structure and how to drive that structure to execute queries has been described in two previous articles and will not be covered here. In this section, I will focus on the detailed execution process and TiDB’s distributed execution framework.
Coprocessor framework
The concept of Coprocessor is borrowed from HBase. In simple terms, Coprocessor is a piece of computing logic that is injected into a storage engine. It waits for a computation request (a serialized physical execution plan) from the SQL layer, processes local data, and returns the calculation result. In TiDB, calculations are performed in Region units, SQ
Layer L will analyze the Key Range of the data to be processed, divide these Key ranges into several Key ranges according to the Region information obtained in PD, and finally send these requests to the corresponding regions.
The SQL layer summarizes the results returned by multiple regions and generates the final result set after required Operator processing.
DistSQL
TiDB abstracts a unified distributed query interface called the DistSQL API to avoid coupling the complex processing logic involved in the distribution and summary of requests, such as error retries, routing information retrieval, concurrency control, and the order in which results are returned to the SQL layer. Located in the distsql package.
The most important of these methods is the SelectDAG function:
// SelectDAG sends a DAG request, returns SelectResult.
// In kvReq, KeyRanges is required, Concurrency/KeepOrder/Desc/IsolationLevel/Priority are optional.
func SelectDAG(goCtx goctx.Context, ctx context.Context, kvReq *kv.Request, fieldTypes []*types.FieldType) (SelectResult, error) {
// kvReq contains the data involved in the calculation of KeyRanges
// The TiKV Client is used to send computation requests to the TiKV cluster
resp := ctx.GetClient().Send(goCtx, kvReq)
if resp == nil {
err := errors.New("client returns nil response")
return nil, errors.Trace(err)
}
if kvReq.Streaming {
return &streamResult{
resp: resp,
rowLen: len(fieldTypes),
fieldTypes: fieldTypes,
ctx: ctx,
}, nil
}
// The result is wrapped here
return &selectResult{
label: "dag",
resp: resp,
results: make(chan newResultWithErr, kvReq.Concurrency),
closed: make(chan struct{}),
rowLen: len(fieldTypes),
fieldTypes: fieldTypes,
ctx: ctx,
}, nil
}
Copy the code
We will skip over the logic in TiKV Client for the moment and focus only on how the SQL layer reads the data once it gets the selectResult. The following interface is key.
// SelectResult is an iterator of coprocessor partial results.
type SelectResult interface {
// NextRaw gets the next raw result.
NextRaw(goctx.Context) ([]byte, error)
// NextChunk reads the data into chunk.
NextChunk(goctx.Context, *chunk.Chunk) error
// Close closes the iterator.
Close() error
// Fetch fetches partial results from client.
// The caller should call SetFields() before call Fetch().
Fetch(goctx.Context)
// ScanKeys gets the total scan row count.
ScanKeys() int64
Copy the code
SelectResult implements the selectResult interface, which represents the abstraction of all the results of a query. The calculation is performed in the unit of Region. Therefore, all the results contain the results of all regions involved. Call the Chunk method to read a Chunk’s data, and call the NextChunk method repeatedly until the Chunk’s NumRows returns 0 to get all the results. The NextChunk implementation constantly takes the SelectResponse returned by each Region and writes the results to Chunk.
Root Executor
Currently, there are several calculation requests that can be pushed to TiKV, such as TableScan, IndexScan, Selection, TopN, Limit, and PartialAggregation. Other more complex operators still need to be processed on a single TIDB-server. Therefore, the whole calculation is a multi-TIKV-server parallel processing + a single TIDB-server summary mode.
conclusion
Two of the most complex aspects of Select statement processing are query optimization and distributed execution, both of which will be covered further in future articles. The next article will move away from specific SQL logic and show you how to read a particular module.
Author: Shen Li