@@ -2,6 +2,7 @@ package server
22
33import (
44 "context"
5+ "github.com/pingcap-incubator/tinykv/kv/transaction/mvcc"
56
67 "github.com/pingcap-incubator/tinykv/kv/coprocessor"
78 "github.com/pingcap-incubator/tinykv/kv/storage"
@@ -121,17 +122,225 @@ func (server *Server) Snapshot(stream tinykvpb.TinyKv_SnapshotServer) error {
121122// Transactional API.
122123func (server * Server ) KvGet (_ context.Context , req * kvrpcpb.GetRequest ) (* kvrpcpb.GetResponse , error ) {
123124 // Your Code Here (4B).
125+ server .Latches .WaitForLatches ([][]byte {req .Key })
126+ defer server .Latches .ReleaseLatches ([][]byte {req .Key })
127+
128+ reader , err := server .storage .Reader (req .Context )
129+ if err != nil {
130+ panic (err )
131+ }
132+
133+ var resp kvrpcpb.GetResponse
134+ txn := mvcc .NewMvccTxn (reader , req .Version )
135+ l , err := txn .GetLock (req .Key )
136+ if err != nil {
137+ panic (err )
138+ }
139+ if l != nil && l .Ts <= req .Version {
140+ resp .Error = & kvrpcpb.KeyError {Locked : l .Info (req .Key )}
141+ }
142+
143+ v , err := txn .GetValue (req .Key )
144+ if err != nil {
145+ panic (err )
146+ }
147+ if v == nil || len (v ) == 0 {
148+ resp .NotFound = true
149+ } else {
150+ resp .NotFound = false
151+ resp .Value = v
152+ }
153+
154+ return & resp , nil
155+ }
156+
157+ func (server * Server ) prewriteCell (
158+ k []byte , v []byte ,
159+ primary []byte ,
160+ txn * mvcc.MvccTxn ,
161+ lockTtl uint64 ,
162+ ctx * kvrpcpb.Context ) (kerr * kvrpcpb.KeyError , err error ) {
163+ w , lastWriteTs , err := txn .MostRecentWrite (k )
164+ if err != nil {
165+ return nil , err
166+ }
167+
168+ if lastWriteTs > txn .StartTS {
169+ // As the to-commit ts will be larger than lastWriteTs,
170+ // this is a write-write conflict
171+ return & kvrpcpb.KeyError {
172+ Locked : nil ,
173+ Retryable : "" ,
174+ Abort : "" ,
175+ Conflict : & kvrpcpb.WriteConflict {
176+ StartTs : w .StartTS ,
177+ ConflictTs : lastWriteTs ,
178+ Key : k ,
179+ Primary : primary ,
180+ },
181+ }, nil
182+ }
183+
184+ l , err := txn .GetLock (k )
185+ if err != nil {
186+ return nil , err
187+ }
188+ if l != nil && l .Ts <= txn .StartTS {
189+ return & kvrpcpb.KeyError {Locked : l .Info (k )}, nil
190+ }
191+
192+ // Now it is safe to persist the lock for the cell and write a new version of the data:
193+ txn .PutLock (k , & mvcc.Lock {
194+ Primary : primary ,
195+ Ts : txn .StartTS ,
196+ Ttl : lockTtl ,
197+ Kind : mvcc .WriteKindPut ,
198+ })
199+ txn .PutValue (k , v )
200+ err = server .storage .Write (ctx , txn .Writes ())
201+ if err != nil {
202+ return nil , err
203+ }
204+
124205 return nil , nil
125206}
126207
127208func (server * Server ) KvPrewrite (_ context.Context , req * kvrpcpb.PrewriteRequest ) (* kvrpcpb.PrewriteResponse , error ) {
128209 // Your Code Here (4B).
129- return nil , nil
210+ if len (req .Mutations ) == 0 {
211+ return & kvrpcpb.PrewriteResponse {}, nil
212+ }
213+
214+ var keys [][]byte
215+ writeBuffer := make (map [string ][]byte )
216+ for _ , mu := range req .Mutations {
217+ if _ , ok := writeBuffer [string (mu .Key )]; ! ok {
218+ keys = append (keys , mu .Value )
219+ }
220+ writeBuffer [string (mu .Key )] = mu .Value
221+ }
222+
223+ server .Latches .WaitForLatches (keys )
224+ defer server .Latches .ReleaseLatches (keys )
225+
226+ var resp kvrpcpb.PrewriteResponse
227+ reader , err := server .storage .Reader (req .Context )
228+ if err != nil {
229+ // XXX this can be region error
230+ panic (err )
231+ }
232+
233+ txn := mvcc .NewMvccTxn (reader , req .StartVersion )
234+ for k , val := range writeBuffer {
235+ kerr , err := server .prewriteCell ([]byte (k ), val , req .PrimaryLock , txn , req .LockTtl , req .Context )
236+ if err != nil {
237+ panic (err )
238+ // it really should be: resp.RegionError = err, unfortunately, layering loses the error details
239+ }
240+ if kerr != nil {
241+ resp .Errors = append (resp .Errors , kerr )
242+ return & resp , nil
243+ }
244+ }
245+
246+ return & resp , nil
130247}
131248
249+ /*
250+ Commit a pre-write txn. Give up and leave the kv unchanged if
251+ 1. any of the k does not have a matching startTS lock (aborted), or
252+ 2. any of the k has a committed conflict write
253+
254+ In particular, this rpc is idempotent: on committed write with the
255+ same startTS and commitTS, it does nothing.
256+ */
132257func (server * Server ) KvCommit (_ context.Context , req * kvrpcpb.CommitRequest ) (* kvrpcpb.CommitResponse , error ) {
133258 // Your Code Here (4B).
134- return nil , nil
259+ if len (req .Keys ) == 0 {
260+ return & kvrpcpb.CommitResponse {}, nil
261+ }
262+
263+ server .Latches .WaitForLatches (req .Keys )
264+ defer server .Latches .ReleaseLatches (req .Keys )
265+
266+ reader , err := server .storage .Reader (req .Context )
267+ if err != nil {
268+ panic (err )
269+ }
270+
271+ var resp kvrpcpb.CommitResponse
272+
273+ txn := mvcc .NewMvccTxn (reader , req .StartVersion )
274+
275+ for _ , k := range req .Keys {
276+ l , err := txn .GetLock (k )
277+ if err != nil {
278+ panic (err )
279+ }
280+
281+ // check if it is rolled back or has been committed by a different txn:
282+ w , commitTs , err := txn .MostRecentWrite (k )
283+ if err != nil {
284+ panic (err )
285+ }
286+ if w != nil {
287+ if w .Kind == mvcc .WriteKindRollback {
288+ return & resp , nil
289+ }
290+
291+ if commitTs == req .CommitVersion {
292+ if req .StartVersion != w .StartTS {
293+ panic ("Protocol error" )
294+ }
295+ // do nothing, already committed, don't check the lock or write
296+ // to kv. skip this key
297+ continue
298+ }
299+
300+ if commitTs > req .StartVersion {
301+ // committing a w-w conflict, even if it does not overlap with any
302+ // writes before.
303+ resp .Error = & kvrpcpb.KeyError {
304+ Conflict : & kvrpcpb.WriteConflict {
305+ StartTs : w .StartTS ,
306+ ConflictTs : commitTs ,
307+ Key : k ,
308+ },
309+ }
310+
311+ return & resp , nil
312+ }
313+ }
314+
315+ // either w is nil or the latest write is below this req.StartVersion
316+
317+ // check if it has a lock:
318+ if l == nil {
319+ // no lock at all. this commit has no prewrite as it passes the
320+ // previous write check
321+ return & resp , nil
322+ }
323+
324+ if l .Ts != req .StartVersion {
325+ // conflict, someone is committing with us
326+ resp .Error = & kvrpcpb.KeyError {Retryable : string (k )}
327+ return & resp , nil
328+ }
329+
330+ // commit:
331+ txn .DeleteLock (k )
332+ txn .PutWrite (k , req .CommitVersion , & mvcc.Write {
333+ StartTS : req .StartVersion ,
334+ Kind : mvcc .WriteKindPut ,
335+ })
336+ }
337+
338+ err = server .storage .Write (req .Context , txn .Writes ())
339+ if err != nil {
340+ panic (err )
341+ }
342+
343+ return & resp , nil
135344}
136345
137346func (server * Server ) KvScan (_ context.Context , req * kvrpcpb.ScanRequest ) (* kvrpcpb.ScanResponse , error ) {
0 commit comments