@@ -2,6 +2,8 @@ package mvcc
22
33import (
44 "encoding/binary"
5+ "github.com/pingcap-incubator/tinykv/kv/util/engine_util"
6+ "math"
57
68 "github.com/pingcap-incubator/tinykv/kv/storage"
79 "github.com/pingcap-incubator/tinykv/kv/util/codec"
@@ -41,53 +43,148 @@ func (txn *MvccTxn) Writes() []storage.Modify {
4143// PutWrite records a write at key and ts.
4244func (txn * MvccTxn ) PutWrite (key []byte , ts uint64 , write * Write ) {
4345 // Your Code Here (4A).
46+ txn .writes = append (txn .writes , storage.Modify {Data : storage.Put {
47+ Key : EncodeKey (key , ts ),
48+ Value : write .ToBytes (),
49+ Cf : engine_util .CfWrite ,
50+ }})
4451}
4552
4653// GetLock returns a lock if key is locked. It will return (nil, nil) if there is no lock on key, and (nil, err)
4754// if an error occurs during lookup.
4855func (txn * MvccTxn ) GetLock (key []byte ) (* Lock , error ) {
4956 // Your Code Here (4A).
50- return nil , nil
57+ val , err := txn .Reader .GetCF (engine_util .CfLock , key )
58+
59+ if err != nil {
60+ return nil , err
61+ }
62+
63+ l , err := ParseLock (val )
64+ if err != nil {
65+ panic (err )
66+ }
67+ return l , nil
5168}
5269
5370// PutLock adds a key/lock to this transaction.
5471func (txn * MvccTxn ) PutLock (key []byte , lock * Lock ) {
5572 // Your Code Here (4A).
73+ txn .writes = append (txn .writes , storage.Modify {
74+ Data : storage.Put {
75+ Key : key ,
76+ Cf : engine_util .CfLock ,
77+ Value : lock .ToBytes (),
78+ },
79+ })
5680}
5781
5882// DeleteLock adds a delete lock to this transaction.
5983func (txn * MvccTxn ) DeleteLock (key []byte ) {
6084 // Your Code Here (4A).
85+ txn .writes = append (txn .writes , storage.Modify {
86+ Data : storage.Delete {
87+ Key : key ,
88+ Cf : engine_util .CfLock ,
89+ },
90+ })
6191}
6292
6393// GetValue finds the value for key, valid at the start timestamp of this transaction.
6494// I.e., the most recent value committed before the start of this transaction.
6595func (txn * MvccTxn ) GetValue (key []byte ) ([]byte , error ) {
6696 // Your Code Here (4A).
97+ iter := txn .Reader .IterCF (engine_util .CfWrite )
98+
99+ startKey := EncodeKey (key , txn .StartTS )
100+ endKey := EncodeKey (key , 0 )
101+ for iter .Seek (startKey ); iter .Valid () && ! engine_util .ExceedEndKey (iter .Item ().Key (), endKey ); iter .Next () {
102+ v , err := iter .Item ().Value ()
103+ if err != nil {
104+ return nil , err
105+ }
106+ w , err := ParseWrite (v )
107+ if err != nil {
108+ panic (err )
109+ }
110+
111+ return txn .Reader .GetCF (engine_util .CfDefault , EncodeKey (key , w .StartTS ))
112+ }
113+
67114 return nil , nil
68115}
69116
70117// PutValue adds a key/value write to this transaction.
71118func (txn * MvccTxn ) PutValue (key []byte , value []byte ) {
72119 // Your Code Here (4A).
120+ txn .writes = append (txn .writes , storage.Modify {
121+ Data : storage.Put {
122+ Key : EncodeKey (key , txn .StartTS ),
123+ Value : value ,
124+ Cf : engine_util .CfDefault ,
125+ },
126+ })
73127}
74128
75129// DeleteValue removes a key/value pair in this transaction.
76130func (txn * MvccTxn ) DeleteValue (key []byte ) {
77131 // Your Code Here (4A).
132+ txn .writes = append (txn .writes , storage.Modify {
133+ Data : storage.Delete {
134+ Key : EncodeKey (key , txn .StartTS ),
135+ Cf : engine_util .CfDefault ,
136+ },
137+ })
78138}
79139
80140// CurrentWrite searches for a write with this transaction's start timestamp. It returns a Write from the DB and that
81141// write's commit timestamp, or an error.
82142func (txn * MvccTxn ) CurrentWrite (key []byte ) (* Write , uint64 , error ) {
83143 // Your Code Here (4A).
144+ iter := txn .Reader .IterCF (engine_util .CfWrite )
145+ // the committed write must have timestamp no eariler than txn.StartTS
146+ startKey := EncodeKey (key , math .MaxInt64 )
147+ endKey := EncodeKey (key , txn .StartTS )
148+ for iter .Seek (startKey ); iter .Valid () && ! engine_util .ExceedEndKey (iter .Item ().Key (), endKey ); iter .Next () {
149+ v , err := iter .Item ().Value ()
150+ if err != nil {
151+ return nil , 0 , err
152+ }
153+ w , err := ParseWrite (v )
154+ if err != nil {
155+ panic (err )
156+ }
157+
158+ if w .StartTS != txn .StartTS {
159+ continue
160+ }
161+
162+ return w , decodeTimestamp (iter .Item ().Key ()), nil
163+ }
164+
84165 return nil , 0 , nil
85166}
86167
87168// MostRecentWrite finds the most recent write with the given key. It returns a Write from the DB and that
88169// write's commit timestamp, or an error.
89170func (txn * MvccTxn ) MostRecentWrite (key []byte ) (* Write , uint64 , error ) {
90171 // Your Code Here (4A).
172+ iter := txn .Reader .IterCF (engine_util .CfWrite )
173+
174+ startKey := EncodeKey (key , math .MaxInt64 )
175+ endKey := EncodeKey (key , 0 )
176+ for iter .Seek (startKey ); iter .Valid () && ! engine_util .ExceedEndKey (iter .Item ().Key (), endKey ); iter .Next () {
177+ v , err := iter .Item ().Value ()
178+ if err != nil {
179+ return nil , 0 , err
180+ }
181+ w , err := ParseWrite (v )
182+ if err != nil {
183+ panic (err )
184+ }
185+
186+ return w , decodeTimestamp (iter .Item ().Key ()), nil
187+ }
91188 return nil , 0 , nil
92189}
93190
0 commit comments