wink

有的鸟是不会被关住的, 因为它们的羽毛太耀眼了.

  • main

wink

有的鸟是不会被关住的, 因为它们的羽毛太耀眼了.

  • main

TiDB 源码分析之 Order By

2017-12-05

Order by 源码分析

查询优化器是关系型数据库的核心, 也是 TiDB 代码中最难懂的部分, 这篇文章可以作为 TiDB 查询优化器源码分析的开端, 可能会在文章的过程中掺杂一些跟 Order by 不相关的代码分析. 希望这一个系列能对我跟大家都有帮助.

parser

分析语句的正确方式是从 parser 入手看构造了哪些相关的 ast 结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
OrderBy:
"ORDER" "BY" ByList
{
$$ = &ast.OrderByClause{Items: $3.([]*ast.ByItem)}
}
ByList:
ByItem
{
$$ = []*ast.ByItem{$1.(*ast.ByItem)}
}
| ByList ',' ByItem
{
$$ = append($1.([]*ast.ByItem), $3.(*ast.ByItem))
}
ByItem:
Expression Order
{
expr := $1
valueExpr, ok := expr.(*ast.ValueExpr)
if ok {
position, isPosition := valueExpr.GetValue().(int64)
if isPosition {
expr = &ast.PositionExpr{N: int(position)}
}
}
$$ = &ast.ByItem{Expr: expr, Desc: $2.(bool)}
}
Order:
/* EMPTY */
{
$$ = false // ASC by default
}
| "ASC"
{
$$ = false
}
| "DESC"
{
$$ = true
}

这里可以看到 oder by 的主要数据结构就是 ast.ByItem:

1
2
3
4
5
6
7
// ByItem represents an item in order by or group by.
type ByItem struct {
node
Expr ExprNode
Desc bool
}

plan

然后直接看到 planBuilder.buildSelect:

1
2
3
4
5
6
7
8
9
func (b *planBuilder) buildSelect(sel *ast.SelectStmt) LogicalPlan {
...
if sel.OrderBy != nil {
p = b.buildSort(p, sel.OrderBy.Items, orderMap) // 这个 orderMap 会在下面解释.
if b.err != nil {
return nil
}
}
}

这里会给 order by 创建一个 Sort 的逻辑计划:

1
2
3
4
5
6
7
8
9
// Sort stands for the order by plan.
type Sort struct {
*basePlan
baseLogicalPlan
basePhysicalPlan
ByItems []*ByItems
ExecLimit *Limit // no longer be used by new plan
}

我们首先看一下resolveHavingAndOrderBy, 这里会对 oderBY 的 item 执行一遍 havingAndOrderbyExprResolver, 主要是找到 orderBY 中带有聚合函数的相关联的column, 然后把 AggregateFuncExpr 里面的参数替换成对应 selectFields 里面的 SelectField, 然后得到 aggMapper.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// resolveHavingAndOrderBy will process aggregate functions and resolve the columns that don't exist in select fields.
// If we found some columns that are not in select fields, we will append it to select fields and update the colMapper.
// When we rewrite the order by / having expression, we will find column in map at first.
func (b *planBuilder) resolveHavingAndOrderBy(sel *ast.SelectStmt, p LogicalPlan) (
map[*ast.AggregateFuncExpr]int, map[*ast.AggregateFuncExpr]int) {
extractor := &havingAndOrderbyExprResolver{
p: p,
selectFields: sel.Fields.Fields,
aggMapper: make(map[*ast.AggregateFuncExpr]int),
colMapper: b.colMapper,
outerSchemas: b.outerSchemas,
}
if sel.GroupBy != nil {
extractor.gbyItems = sel.GroupBy.Items
}
// Extract agg funcs from having clause.
if sel.Having != nil {
extractor.curClause = havingClause
n, ok := sel.Having.Expr.Accept(extractor)
if !ok {
b.err = errors.Trace(extractor.err)
return nil, nil
}
sel.Having.Expr = n.(ast.ExprNode)
}
havingAggMapper := extractor.aggMapper
extractor.aggMapper = make(map[*ast.AggregateFuncExpr]int)
extractor.orderBy = true
extractor.inExpr = false
// Extract agg funcs from order by clause.
if sel.OrderBy != nil {
extractor.curClause = orderByClause
for _, item := range sel.OrderBy.Items {
n, ok := item.Expr.Accept(extractor)
if !ok {
b.err = errors.Trace(extractor.err)
return nil, nil
}
item.Expr = n.(ast.ExprNode)
}
}
sel.Fields.Fields = extractor.selectFields
return havingAggMapper, extractor.aggMapper
}

通过 resolveHavingAndOrderBy 的Enter, Leave 我们可以看到, 走到AggregateFuncExpr 的时候, 它会对每一个ColumnNameExpr参数, 调用 Accept, 来 rewrite. 最终可以看到在havingAndOrderbyExprResolver.Leave 的 相应 ColumnNameExpr 处理中, 会把自己转换为 a.selectFields[index].Expr.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Accept implements Node Accept interface.
func (n *AggregateFuncExpr) Accept(v Visitor) (Node, bool) {
newNode, skipChildren := v.Enter(n)
if skipChildren {
return v.Leave(newNode)
}
n = newNode.(*AggregateFuncExpr)
for i, val := range n.Args {
node, ok := val.Accept(v)
if !ok {
return n, false
}
n.Args[i] = node.(ExprNode) // 在这里被改写.
}
return v.Leave(n)
}

然后对于ast.AggregateFuncExpr 自己, 会在 a.selectFields 中 append 一个辅助的 SelectField 到最后, 并且在a.aggMapper 设置自己在 selectFields 数组中的索引, 方便 Sort 算子使用.

1
2
3
4
5
6
7
8
case *ast.AggregateFuncExpr:
a.inAggFunc = false
a.aggMapper[v] = len(a.selectFields)
a.selectFields = append(a.selectFields, &ast.SelectField{
Auxiliary: true,
Expr: v,
AsName: model.NewCIStr(fmt.Sprintf("sel_agg_%d", len(a.selectFields))),
})

planBuilder.BuildSort 创建 Logical Plan

下面我们再来看看 BuildSort:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (b *planBuilder) buildSort(p LogicalPlan, byItems []*ast.ByItem, aggMapper map[*ast.AggregateFuncExpr]int) LogicalPlan {
b.curClause = orderByClause
sort := Sort{}.init(b.ctx)
exprs := make([]*ByItems, 0, len(byItems))
for _, item := range byItems {
it, np, err := b.rewrite(item.Expr, p, aggMapper, true) // 这里会对 byItems 做 rewrite, 方便后续的处理.
if err != nil {
b.err = err
return nil
}
p = np
exprs = append(exprs, &ByItems{Expr: it, Desc: item.Desc})
}
sort.ByItems = exprs
setParentAndChildren(sort, p)
sort.SetSchema(p.Schema().Clone())
return sort
}

这里 rewrite 的意思大概是, 比如有一些可以在这个时期就算出来的结果, 就可以做常量折叠, 比如 YEAR('2009-05-19'); 直接可以用 2009 表示, 或者 1 + 1 这种表达式. 当然还有一些其他情况会做转换. TiDB 里面很多用到的是 Visitor 模式, 这里就不详细解释了, OrderBy 子句大部分是 ColumnNameExpr. 这里 rewritor 会通过 func (er *expressionRewriter) toColumn(v *ast.ColumnName) 把它转成对应 Schema 的 Column 对象.

所以到这里 sort.ByItems 就构造完成了, 让我们继续往下看.

Physical Plan

之后会调用 logic.convert2NewPhysicalPlan 将逻辑计划转为物理计划.

对于语句 select * from t order by a;, 它在这里的 LogicalPlan 层级关系是: Sort --> Projection --> DataSource.

调用链大概如下:

call_stack

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func (p *Sort) convert2NewPhysicalPlan(prop *requiredProp) (task, error) {
t := p.getTask(prop)
if t != nil {
return t, nil
}
if prop.taskTp != rootTaskType {
// TODO: This is a trick here, because an operator that can be pushed to Coprocessor can never be pushed across sort.
// e.g. If an aggregation want to be pushed, the SQL is always like select count(*) from t order by ...
// The Sort will on top of Aggregation. If the SQL is like select count(*) from (select * from s order by k).
// The Aggregation will also be blocked by projection. In the future we will break this restriction.
p.storeTask(prop, invalidTask)
return invalidTask, nil
}
// enforce branch
// 对于 select * from t order by a, sort plan 的第一个孩子是 Projection, 由于 Projection 没有实现convert2NewPhysicalPlan,
// 所以调用的是 baseLogicalPlan.convert2NewPhysicalPlan
t, err := p.children[0].(LogicalPlan).convert2NewPhysicalPlan(&requiredProp{taskTp: rootTaskType, expectedCnt: math.MaxFloat64})
if err != nil {
return nil, errors.Trace(err)
}
t = p.attach2Task(t)
newProp, canPassProp := getPropByOrderByItems(p.ByItems)
if canPassProp {
newProp.expectedCnt = prop.expectedCnt
orderedTask, err := p.children[0].(LogicalPlan).convert2NewPhysicalPlan(newProp)
if err != nil {
return nil, errors.Trace(err)
}
if orderedTask.cost() < t.cost() {
t = orderedTask
}
}
t = prop.enforceProperty(t, p.ctx)
p.storeTask(prop, t)
return t, nil
}

这里值得注意的是, 在 Sort.convert2NewPhysicalPlan 里面, 分别两次调用了 p.children[0].(LogicalPlan).convert2NewPhysicalPlan, 所不同的只是, 在第二次的时候, 在 requiredProp 里面加入了 order by 相应的列. 这个其实是用来比较两次出来的计划的代价, 来决定是不是下推这个 order by. 然后会把 copTask 的 keepOrder 设置为 true. 如果发现这个 order by 可以下推, 比如刚好 order by 的是主键或者是索引, 那么这个时候就不需要构造一个 Sort 的物理计划了, 直接是一个 PhysicalTableReader 或者 PhysicalIndexReader 或者 PhysicalIndexLookupReader.

具体用到这个 requiredProp 的 cols, 是在 Sort 的 children DataSource 的convertToIndexScan:

1
2
3
4
5
6
7
8
9
10
11
12
13
if !prop.isEmpty() {
for i, col := range idx.Columns {
// not matched
if col.Name.L == prop.cols[0].ColName.L {
matchProperty = matchIndicesProp(idx.Columns[i:], prop.cols)
break
} else if i >= len(is.AccessCondition) {
break
} else if sf, ok := is.AccessCondition[i].(*expression.ScalarFunction); !ok || sf.FuncName.L != ast.EQ {
break
}
}
}

在 baseLogicalPlan.convert2NewPhysicalPlan 大体就是得到一个最优的 task, 然后通过 p.storeTask(prop, t) 存到 p.taskMap 里面:

1
2
3
4
5
6
7
8
9
10
// task is a new version of `PhysicalPlanInfo`. It stores cost information for a task.
// A task may be CopTask, RootTask, MPPTask or a ParallelTask.
type task interface {
count() float64
addCost(cost float64)
cost() float64
copy() task
plan() PhysicalPlan
invalid() bool
}

之后它会调用到 child 的 DataSource.convert2NewPhysicalPlan. 这里会遍历所有可能的索引, 然后选择一个最小代价的计划.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// It will enumerate all the available indices and choose a plan with least cost.
func (p *DataSource) convert2NewPhysicalPlan(prop *requiredProp) (task, error) {
if prop == nil {
return nil, nil
}
t := p.getTask(prop)
if t != nil {
return t, nil
}
t, err := p.tryToGetDualTask()
if err != nil {
return nil, errors.Trace(err)
}
if t != nil {
p.storeTask(prop, t)
return t, nil
}
t, err = p.tryToGetMemTask(prop)
if err != nil {
return nil, errors.Trace(err)
}
if t != nil {
p.storeTask(prop, t)
return t, nil
}
// TODO: We have not checked if this table has a predicate. If not, we can only consider table scan.
indices := p.availableIndices.indices
includeTableScan := p.availableIndices.includeTableScan
t = invalidTask
// 它这里先会创建一个 PhysicalTableReader 的 copTask, 会在下面再创建一个 indexReader, 如果 idxTask 的代价小于 TableReader 的 copTask, 那么会选 indexReader
if includeTableScan {
t, err = p.convertToTableScan(prop)
if err != nil {
return nil, errors.Trace(err)
}
}
if !includeTableScan || len(p.pushedDownConds) > 0 || len(prop.cols) > 0 {
for _, idx := range indices {
idxTask, err := p.convertToIndexScan(prop, idx)
if err != nil {
return nil, errors.Trace(err)
}
// 这里会选一个 cost 最小的 IndexScan, 具体这个 IndexScan 是怎么转换得来的,
// 可以看下面的 DataSource.convertToIndexScan 一节.
if idxTask.cost() < t.cost() {
t = idxTask
}
}
}
p.storeTask(prop, t)
return t, nil
}

DataSource.convertToTableScan

TiDB 里面把读取表的物理计划, 都叫做 TableScan, 它会用一个 range 的对象来表示范围, 如果是全表扫, 那就是[-inc, +inc], 如果是点查, 那就是 [begin, end], begin == end, 所以看看这里是怎么把 DataSource 转为 TableScan的.

对于表:

1
2
3
4
5
6
create table t1(
a int,
b int,
primary key(a),
key b_idx(b)
);

执行的操作 select * from t1 where b = 2 order by b;, 我们再来分析下面的函数, 会比较好阐述.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
// convertToTableScan converts the DataSource to table scan.
func (p *DataSource) convertToTableScan(prop *requiredProp) (task task, err error) {
if prop.taskTp == copDoubleReadTaskType {
return &copTask{cst: math.MaxFloat64}, nil
}
ts := PhysicalTableScan{
Table: p.tableInfo,
Columns: p.Columns,
TableAsName: p.TableAsName,
DBName: p.DBName,
}.init(p.ctx)
ts.SetSchema(p.schema)
sc := p.ctx.GetSessionVars().StmtCtx
ts.Ranges = ranger.FullIntRange()
var pkCol *expression.Column
if ts.Table.PKIsHandle {
if pkColInfo := ts.Table.GetPkColInfo(); pkColInfo != nil {
pkCol = expression.ColInfo2Col(ts.schema.Columns, pkColInfo)
}
}
if len(p.pushedDownConds) > 0 {
conds := make([]expression.Expression, 0, len(p.pushedDownConds))
for _, cond := range p.pushedDownConds {
conds = append(conds, cond.Clone())
}
if pkCol != nil {
var ranges []ranger.Range
// ranger.DetachCondsForTableRange 是把过滤条件条件剔除出来,
// 如果是跟pkCol 相关的, 会放在ts.AccessCondition 数组,
// 如果是其他的列, 那么就放在 ts.filterCondition
ts.AccessCondition, ts.filterCondition = ranger.DetachCondsForTableRange(p.ctx, conds, pkCol)
// 这里的 range 包括: IntColumnRange(TableRange), ColumnRange, IndexRange. 当然这里是 ranger.IntRangeType (TableRange).
// 对于 select * from t1 where b = 2 order by b; 这里的 ts.AccessCondition 为空, 所以得出的 range 就是 FullRange, 全表.
// 过滤是需要通过索引 b 来做的, pk 做不了什么.
ranges, err = ranger.BuildRange(sc, ts.AccessCondition, ranger.IntRangeType, []*expression.Column{pkCol}, nil)
ts.Ranges = ranger.Ranges2IntRanges(ranges)
if err != nil {
return nil, errors.Trace(err)
}
} else {
// 如果 primary key 不是 Handle 的话(这个含义需要结合 TiDB 对于主键的处理来理解), 那么所有的条件都放在 ts.filterCondition
ts.filterCondition = conds
}
}
ts.profile = p.getStatsProfileByFilter(p.pushedDownConds)
statsTbl := p.statisticTable
rowCount := float64(statsTbl.Count)
if pkCol != nil {
// TODO: We can use p.getStatsProfileByFilter(accessConditions).
rowCount, err = statsTbl.GetRowCountByIntColumnRanges(sc, pkCol.ID, ts.Ranges)
if err != nil {
return nil, errors.Trace(err)
}
}
copTask := &copTask{
tablePlan: ts,
indexPlanFinished: true,
}
task = copTask
matchProperty := len(prop.cols) == 1 && pkCol != nil && prop.cols[0].Equal(pkCol, nil)
if matchProperty && prop.expectedCnt < math.MaxFloat64 {
selectivity, err := p.statisticTable.Selectivity(p.ctx, ts.filterCondition)
if err != nil {
log.Warnf("An error happened: %v, we have to use the default selectivity", err.Error())
selectivity = selectionFactor
}
rowCount = math.Min(prop.expectedCnt/selectivity, rowCount)
}
ts.expectedCnt = rowCount
copTask.cst = rowCount * scanFactor
if matchProperty {
if prop.desc {
ts.Desc = true
copTask.cst = rowCount * descScanFactor
}
ts.KeepOrder = true
copTask.keepOrder = true
ts.addPushedDownSelection(copTask, p.profile, prop.expectedCnt)
} else {
expectedCnt := math.MaxFloat64
if prop.isEmpty() {
expectedCnt = prop.expectedCnt
} else {
return invalidTask, nil
}
// 这里会把 filterCondition 转成 PhysicalSelection
ts.addPushedDownSelection(copTask, p.profile, expectedCnt)
}
if prop.taskTp == rootTaskType {
task = finishCopTask(task, p.ctx)
} else if _, ok := task.(*rootTask); ok {
return invalidTask, nil
}
return task, nil
}

DataSource.convertToIndexScan

在 DataSource.convert2NewPhysicalPlan 中:

1
2
3
4
5
6
7
8
9
10
11
if !includeTableScan || len(p.pushedDownConds) > 0 || len(prop.cols) > 0 {
for _, idx := range indices {
idxTask, err := p.convertToIndexScan(prop, idx)
if err != nil {
return nil, errors.Trace(err)
}
if idxTask.cost() < t.cost() {
t = idxTask
}
}
}

会尝试把每个 index 都转成 IndexScan:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
// convertToIndexScan converts the DataSource to index scan with idx.
func (p *DataSource) convertToIndexScan(prop *requiredProp, idx *model.IndexInfo) (task task, err error) {
is := PhysicalIndexScan{
Table: p.tableInfo,
TableAsName: p.TableAsName,
DBName: p.DBName,
Columns: p.Columns,
Index: idx,
dataSourceSchema: p.schema,
}.init(p.ctx)
statsTbl := p.statisticTable
rowCount := float64(statsTbl.Count)
sc := p.ctx.GetSessionVars().StmtCtx
idxCols, colLengths := expression.IndexInfo2Cols(p.Schema().Columns, idx)
is.Ranges = ranger.FullIndexRange()
if len(p.pushedDownConds) > 0 {
conds := make([]expression.Expression, 0, len(p.pushedDownConds))
for _, cond := range p.pushedDownConds {
conds = append(conds, cond.Clone())
}
if len(idxCols) > 0 {
var ranges []ranger.Range
is.AccessCondition, is.filterCondition = ranger.DetachIndexConditions(conds, idxCols, colLengths)
ranges, err = ranger.BuildRange(sc, is.AccessCondition, ranger.IndexRangeType, idxCols, colLengths)
if err != nil {
return nil, errors.Trace(err)
}
is.Ranges = ranger.Ranges2IndexRanges(ranges)
rowCount, err = statsTbl.GetRowCountByIndexRanges(sc, is.Index.ID, is.Ranges)
if err != nil {
return nil, errors.Trace(err)
}
} else {
is.filterCondition = conds
}
}
is.profile = p.getStatsProfileByFilter(p.pushedDownConds)
cop := &copTask{
indexPlan: is,
}
if !isCoveringIndex(is.Columns, is.Index.Columns, is.Table.PKIsHandle) {
// 如果索引里面没有全部的数据, 有一些列需要去拿 handle 绑定的value, 这个叫做
// double read
// On this way, it's double read case.
cop.tablePlan = PhysicalTableScan{Columns: p.Columns, Table: is.Table}.init(p.ctx)
cop.tablePlan.SetSchema(p.schema.Clone())
// If it's parent requires single read task, return max cost.
if prop.taskTp == copSingleReadTaskType {
return &copTask{cst: math.MaxFloat64}, nil
}
} else if prop.taskTp == copDoubleReadTaskType {
// If it's parent requires double read task, return max cost.
return &copTask{cst: math.MaxFloat64}, nil
}
is.initSchema(p.id, idx, cop.tablePlan != nil)
// Check if this plan matches the property.
matchProperty := false
if !prop.isEmpty() {
for i, col := range idx.Columns {
// not matched
if col.Name.L == prop.cols[0].ColName.L {
matchProperty = matchIndicesProp(idx.Columns[i:], prop.cols)
break
} else if i >= len(is.AccessCondition) {
break
} else if sf, ok := is.AccessCondition[i].(*expression.ScalarFunction); !ok || sf.FuncName.L != ast.EQ {
break
}
}
}
if matchProperty && prop.expectedCnt < math.MaxFloat64 {
selectivity, err := p.statisticTable.Selectivity(p.ctx, is.filterCondition)
if err != nil {
log.Warnf("An error happened: %v, we have to use the default selectivity", err.Error())
selectivity = selectionFactor
}
rowCount = math.Min(prop.expectedCnt/selectivity, rowCount)
}
is.expectedCnt = rowCount
cop.cst = rowCount * scanFactor
task = cop
if matchProperty {
if prop.desc {
is.Desc = true
cop.cst = rowCount * descScanFactor
}
if cop.tablePlan != nil {
cop.tablePlan.(*PhysicalTableScan).appendExtraHandleCol(p)
}
cop.keepOrder = true
is.addPushedDownSelection(cop, p, prop.expectedCnt)
} else {
is.OutOfOrder = true
expectedCnt := math.MaxFloat64
if prop.isEmpty() {
expectedCnt = prop.expectedCnt
} else {
return invalidTask, nil
}
// 这里把过滤条件转换为 PhysicalSelection
is.addPushedDownSelection(cop, p, expectedCnt)
}
if prop.taskTp == rootTaskType {
task = finishCopTask(task, p.ctx)
} else if _, ok := task.(*rootTask); ok {
return invalidTask, nil
}
return task, nil
}

在 finishCopTask 里面, 会最终计算task 的 cost, 然后会根据 task 里面的 tablePlan 跟 indexPlan 构造最后的物理计划:

  • PhysicalIndexLookUpReader: 如果 tablePlan 跟 indexPlan 都提供
  • PhysicalIndexReader: 如果只有 indexPlan
  • PhysicalTableReader

getBestTask

这一步是 baseLogicalPlan.convert2NewPhysicalPlan 中调用的, 这里可以看到它会迭代 child 的 convert2NewPhysicalPlan, 把它们都转换成 task 对象. 然后选出最佳的 copTask.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (p *baseLogicalPlan) getBestTask(bestTask task, prop *requiredProp, pp PhysicalPlan) (task, error) {
newProps := pp.getChildrenPossibleProps(prop)
for _, newProp := range newProps {
tasks := make([]task, 0, len(p.basePlan.children))
for i, child := range p.basePlan.children {
childTask, err := child.(LogicalPlan).convert2NewPhysicalPlan(newProp[i])
if err != nil {
return nil, errors.Trace(err)
}
tasks = append(tasks, childTask)
}
// 这里是关键, 会计算 task 的代价, 如果当前resultTask 代价小于 bestTask,
// 那么会替换当前的 bestTask. 最终选出最优的 task
// attach2Task 是把 child 的 tasks attach 到当前的 Physical plan, 然后再计算总体代价
resultTask := pp.attach2Task(tasks...)
if resultTask.cost() < bestTask.cost() {
bestTask = resultTask
}
}
return bestTask, nil
}

基本分析到这里, 基本上 sort 的 plan 部分就结束了. 之后就进入到执行器 Executor 部分了, 我们来看看

SortExec

之后代码会进入到 ExecStmt.buildExecutor, 在这里, TiDB 会构造最终的执行器. 对于 Sort:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (b *executorBuilder) buildSort(v *plan.Sort) Executor {
childExec := b.build(v.Children()[0]) // 先 build 孩子算子的执行器, 一层一层 build
if b.err != nil {
b.err = errors.Trace(b.err)
return nil
}
sortExec := SortExec{
baseExecutor: newBaseExecutor(v.Schema(), b.ctx, childExec),
ByItems: v.ByItems,
schema: v.Schema(),
}
sortExec.supportChk = true
if v.ExecLimit != nil {
return &TopNExec{
SortExec: sortExec,
limit: v.ExecLimit,
}
}
return &sortExec
}

有了 SortExec 之后, TiDB 会调用它的 Next() 接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func (e *SortExec) Next(goCtx goctx.Context) (Row, error) {
if !e.fetched {
for {
srcRow, err := e.children[0].Next(goCtx)
if err != nil {
return nil, errors.Trace(err)
}
if srcRow == nil {
break
}
orderRow := &orderByRow{
row: srcRow,
key: make([]*types.Datum, len(e.ByItems)),
}
for i, byItem := range e.ByItems {
key, err := byItem.Expr.Eval(srcRow)
if err != nil {
return nil, errors.Trace(err)
}
orderRow.key[i] = &key
}
e.Rows = append(e.Rows, orderRow)
}
sort.Sort(e)
e.fetched = true
}
if e.err != nil {
return nil, errors.Trace(e.err)
}
if e.Idx >= len(e.Rows) {
return nil, nil
}
row := e.Rows[e.Idx].row
e.Idx++
return row, nil
}

自此, Order by 的代码基本就结束了. 其中还有很多细节没有详细解释, 需要大家看代码去深入了解.

赏

thanks

扫一扫,分享到微信

微信分享二维码
TiDB 源码分析之 Bootstrap
© 2019 wink