@@ -32,24 +32,31 @@ import (
32
32
"go.etcd.io/etcd/server/v3/storage/mvcc"
33
33
)
34
34
35
- func Put (ctx context.Context , lg * zap.Logger , lessor lease.Lessor , kv mvcc.KV , p * pb.PutRequest ) (resp * pb.PutResponse , trace * traceutil.Trace , err error ) {
35
+ type TxnContext struct {
36
+ Logger * zap.Logger
37
+ Lessor lease.Lessor
38
+ KV mvcc.KV
39
+ TxnModeWriteWithSharedBuffer bool
40
+ }
41
+
42
+ func Put (ctx context.Context , txn TxnContext , p * pb.PutRequest ) (resp * pb.PutResponse , trace * traceutil.Trace , err error ) {
36
43
trace = traceutil .Get (ctx )
37
44
// create put tracing if the trace in context is empty
38
45
if trace .IsEmpty () {
39
46
trace = traceutil .New ("put" ,
40
- lg ,
47
+ txn . Logger ,
41
48
traceutil.Field {Key : "key" , Value : string (p .Key )},
42
49
traceutil.Field {Key : "req_size" , Value : p .Size ()},
43
50
)
44
51
ctx = context .WithValue (ctx , traceutil.TraceKey {}, trace )
45
52
}
46
53
leaseID := lease .LeaseID (p .Lease )
47
54
if leaseID != lease .NoLease {
48
- if l := lessor .Lookup (leaseID ); l == nil {
55
+ if l := txn . Lessor .Lookup (leaseID ); l == nil {
49
56
return nil , nil , lease .ErrLeaseNotFound
50
57
}
51
58
}
52
- txnWrite := kv .Write (trace )
59
+ txnWrite := txn . KV .Write (trace )
53
60
defer txnWrite .End ()
54
61
resp , err = put (ctx , txnWrite , p )
55
62
return resp , trace , err
@@ -94,18 +101,18 @@ func put(ctx context.Context, txnWrite mvcc.TxnWrite, p *pb.PutRequest) (resp *p
94
101
return resp , nil
95
102
}
96
103
97
- func DeleteRange (ctx context.Context , lg * zap. Logger , kv mvcc. KV , dr * pb.DeleteRangeRequest ) (resp * pb.DeleteRangeResponse , trace * traceutil.Trace , err error ) {
104
+ func DeleteRange (ctx context.Context , txn TxnContext , dr * pb.DeleteRangeRequest ) (resp * pb.DeleteRangeResponse , trace * traceutil.Trace , err error ) {
98
105
trace = traceutil .Get (ctx )
99
106
// create delete tracing if the trace in context is empty
100
107
if trace .IsEmpty () {
101
108
trace = traceutil .New ("delete_range" ,
102
- lg ,
109
+ txn . Logger ,
103
110
traceutil.Field {Key : "key" , Value : string (dr .Key )},
104
111
traceutil.Field {Key : "range_end" , Value : string (dr .RangeEnd )},
105
112
)
106
113
ctx = context .WithValue (ctx , traceutil.TraceKey {}, trace )
107
114
}
108
- txnWrite := kv .Write (trace )
115
+ txnWrite := txn . KV .Write (trace )
109
116
defer txnWrite .End ()
110
117
resp , err = deleteRange (ctx , txnWrite , dr )
111
118
return resp , trace , err
@@ -133,19 +140,19 @@ func deleteRange(ctx context.Context, txnWrite mvcc.TxnWrite, dr *pb.DeleteRange
133
140
return resp , nil
134
141
}
135
142
136
- func Range (ctx context.Context , lg * zap. Logger , kv mvcc. KV , r * pb.RangeRequest ) (resp * pb.RangeResponse , trace * traceutil.Trace , err error ) {
143
+ func Range (ctx context.Context , txn TxnContext , r * pb.RangeRequest ) (resp * pb.RangeResponse , trace * traceutil.Trace , err error ) {
137
144
trace = traceutil .Get (ctx )
138
145
if trace .IsEmpty () {
139
- trace = traceutil .New ("range" , lg )
146
+ trace = traceutil .New ("range" , txn . Logger )
140
147
ctx = context .WithValue (ctx , traceutil.TraceKey {}, trace )
141
148
}
142
149
defer func (start time.Time ) {
143
150
success := err == nil
144
151
RangeSecObserve (success , time .Since (start ))
145
152
}(time .Now ())
146
- txnRead := kv .Read (mvcc .ConcurrentReadTxMode , trace )
153
+ txnRead := txn . KV .Read (mvcc .ConcurrentReadTxMode , trace )
147
154
defer txnRead .End ()
148
- resp , err = executeRange (ctx , lg , txnRead , r )
155
+ resp , err = executeRange (ctx , txn . Logger , txnRead , r )
149
156
return resp , trace , err
150
157
}
151
158
@@ -249,22 +256,22 @@ func executeRange(ctx context.Context, lg *zap.Logger, txnRead mvcc.TxnRead, r *
249
256
return resp , nil
250
257
}
251
258
252
- func Txn (ctx context.Context , lg * zap. Logger , rt * pb.TxnRequest , txnModeWriteWithSharedBuffer bool , kv mvcc. KV , lessor lease. Lessor ) (* pb.TxnResponse , * traceutil.Trace , error ) {
259
+ func Txn (ctx context.Context , txn TxnContext , rt * pb.TxnRequest ) (* pb.TxnResponse , * traceutil.Trace , error ) {
253
260
trace := traceutil .Get (ctx )
254
261
if trace .IsEmpty () {
255
- trace = traceutil .New ("transaction" , lg )
262
+ trace = traceutil .New ("transaction" , txn . Logger )
256
263
ctx = context .WithValue (ctx , traceutil.TraceKey {}, trace )
257
264
}
258
265
isWrite := ! IsTxnReadonly (rt )
259
266
// When the transaction contains write operations, we use ReadTx instead of
260
267
// ConcurrentReadTx to avoid extra overhead of copying buffer.
261
268
var mode mvcc.ReadTxMode
262
- if isWrite && txnModeWriteWithSharedBuffer /*a.s.Cfg.ServerFeatureGate.Enabled(features.TxnModeWriteWithSharedBuffer)*/ {
269
+ if isWrite && txn . TxnModeWriteWithSharedBuffer /*a.s.Cfg.ServerFeatureGate.Enabled(features.TxnModeWriteWithSharedBuffer)*/ {
263
270
mode = mvcc .SharedBufReadTxMode
264
271
} else {
265
272
mode = mvcc .ConcurrentReadTxMode
266
273
}
267
- txnRead := kv .Read (mode , trace )
274
+ txnRead := txn . KV .Read (mode , trace )
268
275
var txnPath []bool
269
276
trace .StepWithFunction (
270
277
func () {
@@ -275,7 +282,7 @@ func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWit
275
282
if isWrite {
276
283
trace .AddField (traceutil.Field {Key : "read_only" , Value : false })
277
284
}
278
- _ , err := checkTxn (txnRead , rt , lessor , txnPath )
285
+ _ , err := checkTxn (txnRead , rt , txn . Lessor , txnPath )
279
286
if err != nil {
280
287
txnRead .End ()
281
288
return nil , nil , err
@@ -288,11 +295,11 @@ func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWit
288
295
var txnWrite mvcc.TxnWrite
289
296
if isWrite {
290
297
txnRead .End ()
291
- txnWrite = kv .Write (trace )
298
+ txnWrite = txn . KV .Write (trace )
292
299
} else {
293
300
txnWrite = mvcc .NewReadOnlyTxnWrite (txnRead )
294
301
}
295
- txnResp , err := txn (ctx , lg , txnWrite , rt , isWrite , txnPath )
302
+ txnResp , err := assembleTxnResponse (ctx , txn . Logger , txnWrite , rt , isWrite , txnPath )
296
303
txnWrite .End ()
297
304
298
305
trace .AddField (
@@ -302,7 +309,7 @@ func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWit
302
309
return txnResp , trace , err
303
310
}
304
311
305
- func txn (ctx context.Context , lg * zap.Logger , txnWrite mvcc.TxnWrite , rt * pb.TxnRequest , isWrite bool , txnPath []bool ) (* pb.TxnResponse , error ) {
312
+ func assembleTxnResponse (ctx context.Context , lg * zap.Logger , txnWrite mvcc.TxnWrite , rt * pb.TxnRequest , isWrite bool , txnPath []bool ) (* pb.TxnResponse , error ) {
306
313
txnResp , _ := newTxnResp (rt , txnPath )
307
314
_ , err := executeTxn (ctx , lg , txnWrite , rt , txnPath , txnResp )
308
315
if err != nil {
0 commit comments