forked from talent-plan/tinykv
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtransaction.go
More file actions
274 lines (237 loc) · 8.4 KB
/
transaction.go
File metadata and controls
274 lines (237 loc) · 8.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
package mvcc
import (
"bytes"
"encoding/binary"
"github.com/pingcap-incubator/tinykv/kv/util/engine_util"
"github.com/pingcap-incubator/tinykv/kv/storage"
"github.com/pingcap-incubator/tinykv/kv/util/codec"
"github.com/pingcap-incubator/tinykv/proto/pkg/kvrpcpb"
"github.com/pingcap-incubator/tinykv/scheduler/pkg/tsoutil"
)
// KeyError is a wrapper type so we can implement the `error` interface.
type KeyError struct {
kvrpcpb.KeyError
}
func (ke *KeyError) Error() string {
return ke.String()
}
// MvccTxn groups together writes as part of a single transaction. It also provides an abstraction over low-level
// storage, lowering the concepts of timestamps, writes, and locks into plain keys and values.
// MvccTxn 结构体用于将多个写操作组合成一个事务。它还提供了对低级存储的抽象,将时间戳、写操作和锁的概念简化为普通的键和值。
type MvccTxn struct {
// 事务的开始时间戳。
StartTS uint64
// 用于读取存储的接口.
Reader storage.StorageReader
// 存储修改操作的切片。
writes []storage.Modify
}
func NewMvccTxn(reader storage.StorageReader, startTs uint64) *MvccTxn {
return &MvccTxn{
Reader: reader,
StartTS: startTs,
}
}
// Writes returns all changes added to this transaction.
func (txn *MvccTxn) Writes() []storage.Modify {
return txn.writes
}
// PutWrite records a write at key and ts.
// PutWrite 函数的作用是记录一个写操作到指定的键和时间戳。
func (txn *MvccTxn) PutWrite(key []byte, ts uint64, write *Write) {
// Your Code Here (4A).
txn.writes = append(txn.writes, storage.Modify{
Data: storage.Put {
Key: EncodeKey(key,ts),
Cf: engine_util.CfWrite,
Value: write.ToBytes(),
},
})
}
// GetLock returns a lock if key is locked. It will return (nil, nil) if there is no lock on key, and (nil, err)
// if an error occurs during lookup.
// GetLock 函数的作用是检查指定的键是否被锁定,并返回相应的锁信息。
func (txn *MvccTxn) GetLock(key []byte) (*Lock, error) {
// Your Code Here (4A).
value, err := txn.Reader.GetCF(engine_util.CfLock,key)
if err != nil {
return nil, err
}
// It will return (nil, nil) if there is no lock on key
if value == nil {
return nil, nil
}
lock, err := ParseLock(value)
if err != nil {
return nil, err
}
return lock, nil
}
// PutLock adds a key/lock to this transaction.
// PutLock 函数的作用是将一个键和锁添加到当前事务中。
func (txn *MvccTxn) PutLock(key []byte, lock *Lock) {
// Your Code Here (4A).
txn.writes = append(txn.writes, storage.Modify{
Data: storage.Put {
Key: key,
Cf: engine_util.CfLock,
Value: lock.ToBytes(),
},
})
}
// DeleteLock adds a delete lock to this transaction.
// DeleteLock 函数的作用是将一个删除锁操作添加到当前事务中。
func (txn *MvccTxn) DeleteLock(key []byte) {
// Your Code Here (4A).
txn.writes = append(txn.writes, storage.Modify{
Data: storage.Delete {
Key: key,
Cf: engine_util.CfLock,
},
})
}
// GetValue finds the value for key, valid at the start timestamp of this transaction.
// I.e., the most recent value committed before the start of this transaction.
// GetValue 查询当前事务下,传入 key 对应的 Value。
func (txn *MvccTxn) GetValue(key []byte) ([]byte, error) {
// Your Code Here (4A).
it := txn.Reader.IterCF(engine_util.CfWrite)
defer it.Close()
encodeKey := EncodeKey(key,txn.StartTS)
// Seek 函数的作用是将迭代器定位到提供的键。如果该键存在,迭代器将指向该键;如果不存在,迭代器将指向大于提供键的下一个最小键。
it.Seek(encodeKey)
if it.Valid() == false {
return nil, nil
}
// 判断找到的 key 是不是预期的 key
// KeyCopy(dst []byte) []byte: 返回键的副本。如果传入的切片为 nil 或容量不足,将分配一个新的切片并返回。
resultKey := DecodeUserKey(it.Item().KeyCopy(nil))
if bytes.Equal(resultKey,key) == false { // key 不相等
return nil, nil
}
// ValueCopy(dst []byte) ([]byte, error): 返回值的副本。如果传入的切片为 nil 或容量不足,将分配一个新的切片并返回。
value, err := it.Item().ValueCopy(nil)
if err != nil {
return nil, err
}
// 根据 value 获取其对应的 write 结构体
write, err := ParseWrite(value)
if err != nil {
return nil, err
}
if write.Kind == WriteKindPut {
goatKey := EncodeKey(key, write.StartTS)
return txn.Reader.GetCF(engine_util.CfDefault,goatKey)
}
return nil, nil
}
// PutValue adds a key/value write to this transaction.
// PutValue 函数将一个键/值写入操作添加到事务中
func (txn *MvccTxn) PutValue(key []byte, value []byte) {
// Your Code Here (4A).
txn.writes = append(txn.writes, storage.Modify{
Data: storage.Put {
Key: EncodeKey(key,txn.StartTS),
Cf: engine_util.CfDefault,
Value: value,
},
})
}
// DeleteValue removes a key/value pair in this transaction.
// DeleteValue 函数将一个键/值删除操作添加到事务中
func (txn *MvccTxn) DeleteValue(key []byte) {
// Your Code Here (4A).
txn.writes = append(txn.writes, storage.Modify{
Data: storage.Delete {
Key: EncodeKey(key,txn.StartTS),
Cf: engine_util.CfDefault,
},
})
}
// CurrentWrite searches for a write with this transaction's start timestamp. It returns a Write from the DB and that
// write's commit timestamp, or an error.
// CurrentWrite 查询当前事务(根据 start timestamp)下,传入 key 的最新 Write。
func (txn *MvccTxn) CurrentWrite(key []byte) (*Write, uint64, error) {
// Your Code Here (4A).
it := txn.Reader.IterCF(engine_util.CfWrite)
defer it.Close()
for it.Seek(EncodeKey(key,0xFFFFFFFFFFFFFFFF)); it.Valid(); it.Next() {
item := it.Item()
curKey := item.KeyCopy(nil)
userKey := DecodeUserKey(curKey)
if bytes.Equal(userKey,key) == false {
return nil, 0, nil
}
value, err := item.ValueCopy(nil)
if err != nil {
return nil, 0, err
}
write, err := ParseWrite(value)
if err != nil {
return nil, 0, err
}
if write.StartTS == txn.StartTS {
return write, decodeTimestamp(curKey), nil
}
}
return nil, 0, nil
}
// MostRecentWrite finds the most recent write with the given key. It returns a Write from the DB and that
// write's commit timestamp, or an error.
// MostRecentWrite 查询传入 key 的最新 Write
func (txn *MvccTxn) MostRecentWrite(key []byte) (*Write, uint64, error) {
// Your Code Here (4A).
it := txn.Reader.IterCF(engine_util.CfWrite)
defer it.Close()
it.Seek(EncodeKey(key,0xFFFFFFFFFFFFFFFF))
if it.Valid() == false {
return nil, 0, nil
}
curKey := it.Item().KeyCopy(nil)
userKey := DecodeUserKey(curKey)
if bytes.Equal(userKey,key) == false {
return nil, 0, nil
}
value, err := it.Item().ValueCopy(nil)
if err != nil {
return nil, 0, err
}
write, err := ParseWrite(value)
if err != nil {
return nil, 0, err
}
return write, decodeTimestamp(curKey), nil
}
// EncodeKey encodes a user key and appends an encoded timestamp to a key. Keys and timestamps are encoded so that
// timestamped keys are sorted first by key (ascending), then by timestamp (descending). The encoding is based on
// https://github.com/facebook/mysql-5.6/wiki/MyRocks-record-format#memcomparable-format.
// EncodeKey 的作用是对用户的键进行编码,并附加一个编码的时间戳。这样做的目的是使带有时间戳的键首先按键(升序)排序,然后按时间戳(降序)排序。
func EncodeKey(key []byte, ts uint64) []byte {
encodedKey := codec.EncodeBytes(key)
newKey := append(encodedKey, make([]byte, 8)...)
binary.BigEndian.PutUint64(newKey[len(encodedKey):], ^ts)
return newKey
}
// DecodeUserKey takes a key + timestamp and returns the key part.
// DecodeUserKey 函数从包含键和时间戳的编码键中提取并返回用户键部分。
func DecodeUserKey(key []byte) []byte {
_, userKey, err := codec.DecodeBytes(key)
if err != nil {
panic(err)
}
return userKey
}
// decodeTimestamp takes a key + timestamp and returns the timestamp part.
// decodeTimestamp 函数从包含键和时间戳的编码键中提取并返回时间戳部分。
func decodeTimestamp(key []byte) uint64 {
left, _, err := codec.DecodeBytes(key)
if err != nil {
panic(err)
}
return ^binary.BigEndian.Uint64(left)
}
// PhysicalTime returns the physical time part of the timestamp.
// PhysicalTime 函数的作用是从一个时间戳中提取物理时间部分。
func PhysicalTime(ts uint64) uint64 {
return ts >> tsoutil.PhysicalShiftBits
}