wink

有的鸟是不会被关住的, 因为它们的羽毛太耀眼了.

  • main

wink

有的鸟是不会被关住的, 因为它们的羽毛太耀眼了.

  • main

tidb笔记1

2016-04-06

tidb笔记1

语法解析

tidb的语法解析有一些语句并没有作支持: REPLACE

go build -ldflags参数

在Makefile中, 有这么一句:

1
2
3
4
5
6
7
8
9
10
LDFLAGS += -X "github.com/pingcap/tidb/util/printer.TiDBBuildTS=$(shell date -u '+%Y-%m-%d %I:%M:%S')"
LDFLAGS += -X "github.com/pingcap/tidb/util/printer.TiDBGitHash=$(shell git rev-parse HEAD)"
...
server:
ifeq ($(TARGET), "")
@cd tidb-server && $(GO) build -ldflags '$(LDFLAGS)'
else
@cd tidb-server && $(GO) build -ldflags '$(LDFLAGS)' -o '$(TARGET)'
endif

我们来看看ldflags的意义

1
2
3
4
5
$ go help build
...
-ldflags 'flag list'
arguments to pass on each go tool link invocation.
...

这个参数是传递给go tool link的, 我们看看go tool:

1
2
3
4
5
6
...
-V print version and exit
-W disassemble input
-X definition
add string value definition of the form importpath.name=value
...

这个-X选项的意思是, 在二进制中增加一个定义, 格式是importpath.variable_name=value, 所以我们看到了printer.TiDBBuildTS和printer.TiDBGitHash的两个变量的定义. 在看看这两个变量在代码中的位置:

1
2
3
4
5
6
7
8
9
10
11
12
package printer
import (
"bytes"
"fmt"
)
// Version information.
var (
TiDBBuildTS = "None"
TiDBGitHash = "None"
)

这就知道了这个选项的目的了

从Slice转换到String 避免复制

1
2
3
4
5
6
7
8
9
10
11
12
// String converts slice to string without copy.
// Use at your own risk.
func String(b []byte) (s string) {
if len(b) == 0 {
return ""
}
pbytes := (*reflect.SliceHeader)(unsafe.Pointer(&b))
pstring := (*reflect.StringHeader)(unsafe.Pointer(&s))
pstring.Data = pbytes.Data
pstring.Len = pbytes.Len
return
}

slice转string之后,如果slice的值有变化,string也会跟着改变,如下:

1
2
3
4
5
6
7
b := []byte("hello world")
a := String(b)
b[0] = 'a'
println(a) //output aello world

但是string转slice之后,就不能更改slice了,如下:

1
2
3
4
5
6
7
8
a := "hello world"
b := Slice(a)
b[0] = 'a' //这里就等着崩溃吧
//但是可以这样,因为go又重新给b分配了内存
b = append(b, "hello world"…)

上面为什么会崩溃我猜想可能是string是immutable的,可能对应的内存地址也是不允许改动的。

另外,上面这个崩溃在defer里面是recover不回来的,真的就崩溃了,原因可能就跟c的非法内存访问一样,os不跟你玩了。

并发控制

tiDB里面利用channel实现了一种沙漏算法, 详情见: server/tokenlimiter.go 以及Server.getToken, releaseToken函数

bootstrap(tidb/bootstrap.go:89)

每次tidb启动, 都会执行这个启动函数一次.

compile

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Compile is safe for concurrent use by multiple goroutines.
func Compile(src string) ([]stmt.Statement, error) {
log.Debug("compiling", src)
l := parser.NewLexer(src)
if parser.YYParse(l) != 0 {
log.Warnf("compiling %s, error: %v", src, l.Errors()[0])
return nil, errors.Trace(l.Errors()[0])
}
rawStmt := l.Stmts()
stmts := make([]stmt.Statement, len(rawStmt))
for i, v := range rawStmt {
if node, ok := v.(ast.Node); ok {
stm, err := optimizer.Compile(node)
if err != nil {
return nil, errors.Trace(err)
}
stmts[i] = stm
} else {
stmts[i] = v.(stmt.Statement)
}
}
return stmts, nil
}

我们看到这里有一个判断是不是抽象语法树ast.Node的, tidb里面心很大它实现了两套statement对象, 一套是ast包里面的, 可以用调用stm, err := optimizer.Compile(node)来优化, 一套是stmt包里面的, 详见parser.y:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
StatementList:
Statement
{
if $1 != nil {
n, ok := $1.(ast.Node)
if ok {
n.SetText(yylex.(*lexer).stmtText())
yylex.(*lexer).list = []interface{}{n} // 作为ast.Node
} else {
s := $1.(stmt.Statement) // 否则作为stmt.Statement
s.SetText(yylex.(*lexer).stmtText())
yylex.(*lexer).list = []interface{}{s}
}
}
}

然而, 并没什么卵用, 在parser.y中可以看到, 所有的Statement都是stmt包里面的, 所以Compile中的optimizer.Compile(node) 不会运行.

并行Create database会失败

看到:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (d *ddl) verifySchemaMetaVersion(txn kv.Transaction, schemaMetaVersion int64) error {
curVer, err := txn.GetInt64(meta.SchemaMetaVersionKey)
if err != nil {
return errors.Trace(err)
}
if curVer != schemaMetaVersion {
return errors.Errorf("Schema changed, our version %d, but got %d", schemaMetaVersion, curVer)
}
// Increment version.
_, err = txn.Inc(meta.SchemaMetaVersionKey, 1)
if err != nil {
return errors.Trace(err)
}
if err := txn.LockKeys(meta.SchemaMetaVersionKey); err != nil {
return errors.Trace(err)
}
return errors.Trace(err)
}

如果在别的goroutine中别人更改了meta.SchemaMetaVersionKey, 那么此次create database就会失败, 也不会重试

事务实现

参考: mysql事务

基本借鉴了percolater那种利用时间戳来实现多版本的方式, 在事务中修改的值通过dbTxn.LockKeys函数来把key锁住, 如果在别的事务中对这些key修改了, 那么此事务commit的时候将不能成功, 具体的实现在dbTxn.doCommit:

1
2
3
4
5
6
7
8
9
10
11
12
func (txn *dbTxn) doCommit() error {
...
// Check locked keys
for k, v := range txn.snapshotVals {
err := txn.store.tryConditionLockKey(txn.tid, k, v)
if err != nil {
return errors.Trace(err)
}
keysLocked = append(keysLocked, k)
}
...
}

在向leveldb写入值之前, 先用tryConditionLockKey看能否锁住那些值, 同时判断现在存在的版本是否已经被别人修改过(通过值绑定的版本, 也就是时间戳来判断):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// Both lock and unlock are used for simulating scenario of percolator papers.
func (s *dbStore) tryConditionLockKey(tid uint64, key string, snapshotVal []byte) error {
s.mu.Lock()
defer s.mu.Unlock()
if _, ok := s.keysLocked[key]; ok {
return errors.Trace(kv.ErrLockConflict)
}
metaKey := codec.EncodeBytes(nil, []byte(key))
currValue, err := s.db.Get(metaKey)
if errors2.ErrorEqual(err, kv.ErrNotExist) || currValue == nil {
// If it's a new key, we won't need to check its version
return nil
}
if err != nil {
return errors.Trace(err)
}
_, ver, err := codec.DecodeUint(currValue)
if err != nil {
return errors.Trace(err)
}
// If there's newer version of this key, returns error.
if ver > tid {
log.Warnf("txn:%d, tryLockKey condition not match for key %s, currValue:%q, snapshotVal:%q", tid, key, currValue, snapshotVal)
return errors.Trace(kv.ErrConditionNotMatch)
}
s.keysLocked[key] = tid
return nil
}

然而, 即使如此, 还是会有write skew问题, 因为其并没有对读取的值锁住. 所以, tidb也对Select for update做出了相应的支持

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func (txn *dbTxn) doCommit() error {
b := txn.store.newBatch()
keysLocked := make([]string, 0, len(txn.snapshotVals))
defer func() {
for _, key := range keysLocked {
txn.store.unLockKeys(key)
}
}()
// Check locked keys
for k, v := range txn.snapshotVals {
err := txn.store.tryConditionLockKey(txn.tid, k, v)
if err != nil {
return errors.Trace(err)
}
keysLocked = append(keysLocked, k)
}
// Check dirty store
curVer, err := globalVersionProvider.CurrentVersion()
if err != nil {
return errors.Trace(err)
}
err = txn.each(func(iter iterator.Iterator) error {
metaKey := codec.EncodeBytes(nil, iter.Key())
// put dummy meta key, write current version
b.Put(metaKey, codec.EncodeUint(nil, curVer.Ver)) // 每个key都有一个对应的metakey, 保存的是这个key的version(时间戳)
mvccKey := MvccEncodeVersionKey(iter.Key(), curVer) // 然后通过这个version跟key结合起来就可以得到真正的key
if len(iter.Value()) == 0 { // Deleted marker
b.Put(mvccKey, nil)
} else {
b.Put(mvccKey, iter.Value())
}
return nil
})
if err != nil {
return errors.Trace(err)
}
// Update commit version.
txn.version = curVer
// Release read lock before write. Workaround for BoltDB.
txn.Snapshot.Release()
return txn.store.writeBatch(b)
}

infoSchema.tables 构造

1
2
3
4
5
6
7
8
9
10
11
12
13
type infoSchema struct {
schemaNameToID map[string]int64
tableNameToID map[tableName]int64
columnNameToID map[columnName]int64
schemas map[int64]*model.DBInfo
tables map[int64]table.Table
columns map[int64]*model.ColumnInfo
indices map[indexName]*model.IndexInfo
columnIndices map[int64][]*model.IndexInfo
// We should check version when change schema.
schemaMetaVersion int64
}

infoSchema.tables成员是用来存放table定义的, 但是让我感觉迷惑的是, 在ddl.CreateTable中, 并没有对这个值进行构造:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (d *ddl) CreateTable(ctx context.Context, ident table.Ident, colDefs []*coldef.ColumnDef, constraints []*coldef.TableConstraint) (err error) {
...
tbInfo, err := d.buildTableInfo(ident.Name, cols, newConstraints)
...
err = kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
err := d.verifySchemaMetaVersion(txn, is.SchemaMetaVersion())
if err != nil {
return errors.Trace(err)
}
err = d.updateInfoSchema(ident.Schema, tbInfo, txn)
return errors.Trace(err)
}
...
}

只设置了model.TableInfo, 那么, 到底infoSchema.tables是在哪里更新的呢, 我们看到CreateTable还会触发Domain.onDDLChange, 这个函数里面:

调用了Handle.Set

1
2
3
4
5
6
7
8
9
10
11
12
func (h *Handle) Set(newInfo []*model.DBInfo, schemaMetaVersion int64) {
...
for _, di := range newInfo {
info.schemas[di.ID] = di
info.schemaNameToID[di.Name.L] = di.ID
for _, t := range di.Tables {
alloc := autoid.NewAllocator(h.store)
info.tables[t.ID] = table.TableFromMeta(di.Name.L, alloc, t) // 就是在这里设置的tables
...
}
h.value.Store(info)
}

所以每次新建一个Table, tidb都会重建infoSchema.tables变量(这样不好吧? 为何不增量的增加)

codec.EncodeBytes

我们看到每次插入leveldb之前, 都会对其key作Encode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// EncodeBytes encodes the slice value using an escape based encoding,
// with the following rule:
// \x00 -> \x00\xFF
// \xFF -> \xFF\x00 if the first byte is \xFF
// EncodeBytes will append \x00\x01 at the end of the encoded value to
// indicate the termination.
// EncodeBytes guarantees the encoded value is in ascending order for comparison,
// The encoded value is >= SmallestNoneNilValue and < InfiniteValue.
func EncodeBytes(b []byte, data []byte) []byte {
if len(data) > 0 && data[0] == 0xFF {
// we must escape 0xFF here to guarantee encoded value < InfiniteValue \xFF\xFF.
b = append(b, 0xFF, 0x00)
data = data[1:]
}
for {
// find 0x00 and escape it.
i := bytes.IndexByte(data, 0x00)
if i == -1 {
break
}
b = append(b, data[:i]...)
b = append(b, 0x00, 0xFF)
data = data[i+1:]
}
b = append(b, data...)
return append(b, 0x00, 0x01)
}
func (txn *dbTxn) Seek(k kv.Key, fnKeyCmp func(kv.Key) bool) (kv.Iterator, error) {
...
k = kv.EncodeKey(k)
iter, err := txn.UnionStore.Seek(k, txn)
...
}

其用意可从下面这段注释中窥得一二:

1
2
3
4
5
6
7
8
9
func genIndexPrefix(indexPrefix, indexName string) string {
// Use EncodeBytes to guarantee generating different index prefix.
// e.g, two indices c1 and c with index prefix p, if no EncodeBytes,
// the index format looks p_c and p_c1, if c has an index value which the first encoded byte is '1',
// we will meet an error, because p_c1 is for index c1.
// If EncodeBytes, c1 -> c1\x00\x01 and c -> c\x00\x01, the prefixs are different.
key := fmt.Sprintf("%s_%s", indexPrefix, indexName)
return string(codec.EncodeBytes(nil, []byte(key)))
}

INSERT … ON DUPLICATE KEY UPDATE Syntax

具体详情见:mysql: ON DUPLICATE KEY UPDATE Syntax

这里举个例子:

1
2
3
select * from player_count where player_id = 1;//查询统计表中是否有记录
insert into player_count(player_id,count) value(1,1);//没有记录就执行insert 操作
update player_count set count = count+1 where player_id = 1;//有记录就执行update操作

这种写法比较麻烦, 用on duplicate key update 的做法如下:

1
insert into player_count(player_id,count) value(1,1) on duplicate key update count=count+1;

赏

thanks

  • tidb
  • tech

扫一扫,分享到微信

微信分享二维码
tidb笔记2
c++11 笔记
© 2019 wink