Skip to content

Commit 9a013fa

Browse files
committed
Combine common txn arguments into TxnContext
Signed-off-by: Marek Siarkowicz <[email protected]>
1 parent 58086da commit 9a013fa

File tree

5 files changed

+93
-31
lines changed

5 files changed

+93
-31
lines changed

server/etcdserver/api/v3rpc/validationfuzz_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,12 @@ func execTransaction(t *testing.T, req *pb.RequestOp) {
175175
Success: []*pb.RequestOp{req},
176176
}
177177

178-
_, _, err := txn.Txn(ctx, zaptest.NewLogger(t), request, false, s, &lease.FakeLessor{})
178+
_, _, err := txn.Txn(ctx, txn.TxnContext{
179+
Logger: zaptest.NewLogger(t),
180+
Lessor: &lease.FakeLessor{},
181+
KV: s,
182+
TxnModeWriteWithSharedBuffer: false,
183+
}, request)
179184
if err != nil {
180185
t.Skipf("Application erroring. %s", err.Error())
181186
}

server/etcdserver/apply/apply.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -153,19 +153,33 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest, applyFunc applyFunc)
153153
}
154154

155155
func (a *applierV3backend) Put(p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
156-
return mvcctxn.Put(context.TODO(), a.lg, a.lessor, a.kv, p)
156+
return mvcctxn.Put(context.TODO(), mvcctxn.TxnContext{
157+
Logger: a.lg,
158+
Lessor: a.lessor,
159+
KV: a.kv,
160+
TxnModeWriteWithSharedBuffer: a.txnModeWriteWithSharedBuffer,
161+
}, p)
157162
}
158163

159164
func (a *applierV3backend) DeleteRange(dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, *traceutil.Trace, error) {
160-
return mvcctxn.DeleteRange(context.TODO(), a.lg, a.kv, dr)
165+
return mvcctxn.DeleteRange(context.TODO(), a.context(), dr)
161166
}
162167

163168
func (a *applierV3backend) Range(r *pb.RangeRequest) (*pb.RangeResponse, *traceutil.Trace, error) {
164-
return mvcctxn.Range(context.TODO(), a.lg, a.kv, r)
169+
return mvcctxn.Range(context.TODO(), a.context(), r)
165170
}
166171

167172
func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
168-
return mvcctxn.Txn(context.TODO(), a.lg, rt, a.txnModeWriteWithSharedBuffer, a.kv, a.lessor)
173+
return mvcctxn.Txn(context.TODO(), a.context(), rt)
174+
}
175+
176+
func (a *applierV3backend) context() mvcctxn.TxnContext {
177+
return mvcctxn.TxnContext{
178+
Logger: a.lg,
179+
Lessor: a.lessor,
180+
KV: a.kv,
181+
TxnModeWriteWithSharedBuffer: a.txnModeWriteWithSharedBuffer,
182+
}
169183
}
170184

171185
func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) {

server/etcdserver/txn/txn.go

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -32,24 +32,31 @@ import (
3232
"go.etcd.io/etcd/server/v3/storage/mvcc"
3333
)
3434

35-
func Put(ctx context.Context, lg *zap.Logger, lessor lease.Lessor, kv mvcc.KV, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
35+
type TxnContext struct {
36+
Logger *zap.Logger
37+
Lessor lease.Lessor
38+
KV mvcc.KV
39+
TxnModeWriteWithSharedBuffer bool
40+
}
41+
42+
func Put(ctx context.Context, txn TxnContext, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
3643
trace = traceutil.Get(ctx)
3744
// create put tracing if the trace in context is empty
3845
if trace.IsEmpty() {
3946
trace = traceutil.New("put",
40-
lg,
47+
txn.Logger,
4148
traceutil.Field{Key: "key", Value: string(p.Key)},
4249
traceutil.Field{Key: "req_size", Value: p.Size()},
4350
)
4451
ctx = context.WithValue(ctx, traceutil.TraceKey{}, trace)
4552
}
4653
leaseID := lease.LeaseID(p.Lease)
4754
if leaseID != lease.NoLease {
48-
if l := lessor.Lookup(leaseID); l == nil {
55+
if l := txn.Lessor.Lookup(leaseID); l == nil {
4956
return nil, nil, lease.ErrLeaseNotFound
5057
}
5158
}
52-
txnWrite := kv.Write(trace)
59+
txnWrite := txn.KV.Write(trace)
5360
defer txnWrite.End()
5461
resp, err = put(ctx, txnWrite, p)
5562
return resp, trace, err
@@ -94,18 +101,18 @@ func put(ctx context.Context, txnWrite mvcc.TxnWrite, p *pb.PutRequest) (resp *p
94101
return resp, nil
95102
}
96103

97-
func DeleteRange(ctx context.Context, lg *zap.Logger, kv mvcc.KV, dr *pb.DeleteRangeRequest) (resp *pb.DeleteRangeResponse, trace *traceutil.Trace, err error) {
104+
func DeleteRange(ctx context.Context, txn TxnContext, dr *pb.DeleteRangeRequest) (resp *pb.DeleteRangeResponse, trace *traceutil.Trace, err error) {
98105
trace = traceutil.Get(ctx)
99106
// create delete tracing if the trace in context is empty
100107
if trace.IsEmpty() {
101108
trace = traceutil.New("delete_range",
102-
lg,
109+
txn.Logger,
103110
traceutil.Field{Key: "key", Value: string(dr.Key)},
104111
traceutil.Field{Key: "range_end", Value: string(dr.RangeEnd)},
105112
)
106113
ctx = context.WithValue(ctx, traceutil.TraceKey{}, trace)
107114
}
108-
txnWrite := kv.Write(trace)
115+
txnWrite := txn.KV.Write(trace)
109116
defer txnWrite.End()
110117
resp, err = deleteRange(ctx, txnWrite, dr)
111118
return resp, trace, err
@@ -133,19 +140,19 @@ func deleteRange(ctx context.Context, txnWrite mvcc.TxnWrite, dr *pb.DeleteRange
133140
return resp, nil
134141
}
135142

136-
func Range(ctx context.Context, lg *zap.Logger, kv mvcc.KV, r *pb.RangeRequest) (resp *pb.RangeResponse, trace *traceutil.Trace, err error) {
143+
func Range(ctx context.Context, txn TxnContext, r *pb.RangeRequest) (resp *pb.RangeResponse, trace *traceutil.Trace, err error) {
137144
trace = traceutil.Get(ctx)
138145
if trace.IsEmpty() {
139-
trace = traceutil.New("range", lg)
146+
trace = traceutil.New("range", txn.Logger)
140147
ctx = context.WithValue(ctx, traceutil.TraceKey{}, trace)
141148
}
142149
defer func(start time.Time) {
143150
success := err == nil
144151
RangeSecObserve(success, time.Since(start))
145152
}(time.Now())
146-
txnRead := kv.Read(mvcc.ConcurrentReadTxMode, trace)
153+
txnRead := txn.KV.Read(mvcc.ConcurrentReadTxMode, trace)
147154
defer txnRead.End()
148-
resp, err = executeRange(ctx, lg, txnRead, r)
155+
resp, err = executeRange(ctx, txn.Logger, txnRead, r)
149156
return resp, trace, err
150157
}
151158

@@ -249,22 +256,22 @@ func executeRange(ctx context.Context, lg *zap.Logger, txnRead mvcc.TxnRead, r *
249256
return resp, nil
250257
}
251258

252-
func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWithSharedBuffer bool, kv mvcc.KV, lessor lease.Lessor) (*pb.TxnResponse, *traceutil.Trace, error) {
259+
func Txn(ctx context.Context, txn TxnContext, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
253260
trace := traceutil.Get(ctx)
254261
if trace.IsEmpty() {
255-
trace = traceutil.New("transaction", lg)
262+
trace = traceutil.New("transaction", txn.Logger)
256263
ctx = context.WithValue(ctx, traceutil.TraceKey{}, trace)
257264
}
258265
isWrite := !IsTxnReadonly(rt)
259266
// When the transaction contains write operations, we use ReadTx instead of
260267
// ConcurrentReadTx to avoid extra overhead of copying buffer.
261268
var mode mvcc.ReadTxMode
262-
if isWrite && txnModeWriteWithSharedBuffer /*a.s.Cfg.ServerFeatureGate.Enabled(features.TxnModeWriteWithSharedBuffer)*/ {
269+
if isWrite && txn.TxnModeWriteWithSharedBuffer /*a.s.Cfg.ServerFeatureGate.Enabled(features.TxnModeWriteWithSharedBuffer)*/ {
263270
mode = mvcc.SharedBufReadTxMode
264271
} else {
265272
mode = mvcc.ConcurrentReadTxMode
266273
}
267-
txnRead := kv.Read(mode, trace)
274+
txnRead := txn.KV.Read(mode, trace)
268275
var txnPath []bool
269276
trace.StepWithFunction(
270277
func() {
@@ -275,7 +282,7 @@ func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWit
275282
if isWrite {
276283
trace.AddField(traceutil.Field{Key: "read_only", Value: false})
277284
}
278-
_, err := checkTxn(txnRead, rt, lessor, txnPath)
285+
_, err := checkTxn(txnRead, rt, txn.Lessor, txnPath)
279286
if err != nil {
280287
txnRead.End()
281288
return nil, nil, err
@@ -288,11 +295,11 @@ func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWit
288295
var txnWrite mvcc.TxnWrite
289296
if isWrite {
290297
txnRead.End()
291-
txnWrite = kv.Write(trace)
298+
txnWrite = txn.KV.Write(trace)
292299
} else {
293300
txnWrite = mvcc.NewReadOnlyTxnWrite(txnRead)
294301
}
295-
txnResp, err := txn(ctx, lg, txnWrite, rt, isWrite, txnPath)
302+
txnResp, err := assembleTxnResponse(ctx, txn.Logger, txnWrite, rt, isWrite, txnPath)
296303
txnWrite.End()
297304

298305
trace.AddField(
@@ -302,7 +309,7 @@ func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWit
302309
return txnResp, trace, err
303310
}
304311

305-
func txn(ctx context.Context, lg *zap.Logger, txnWrite mvcc.TxnWrite, rt *pb.TxnRequest, isWrite bool, txnPath []bool) (*pb.TxnResponse, error) {
312+
func assembleTxnResponse(ctx context.Context, lg *zap.Logger, txnWrite mvcc.TxnWrite, rt *pb.TxnRequest, isWrite bool, txnPath []bool) (*pb.TxnResponse, error) {
306313
txnResp, _ := newTxnResp(rt, txnPath)
307314
_, err := executeTxn(ctx, lg, txnWrite, rt, txnPath, txnResp)
308315
if err != nil {

server/etcdserver/txn/txn_test.go

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,12 @@ func TestCheckTxn(t *testing.T) {
228228

229229
ctx, cancel := context.WithCancel(t.Context())
230230
defer cancel()
231-
_, _, err := Txn(ctx, zaptest.NewLogger(t), tc.txn, false, s, lessor)
231+
_, _, err := Txn(ctx, TxnContext{
232+
Logger: zaptest.NewLogger(t),
233+
Lessor: lessor,
234+
KV: s,
235+
TxnModeWriteWithSharedBuffer: false,
236+
}, tc.txn)
232237

233238
gotErr := ""
234239
if err != nil {
@@ -248,7 +253,11 @@ func TestCheckPut(t *testing.T) {
248253

249254
ctx, cancel := context.WithCancel(t.Context())
250255
defer cancel()
251-
_, _, err := Put(ctx, zaptest.NewLogger(t), lessor, s, tc.op.GetRequestPut())
256+
_, _, err := Put(ctx, TxnContext{
257+
Logger: zaptest.NewLogger(t),
258+
Lessor: lessor,
259+
KV: s,
260+
}, tc.op.GetRequestPut())
252261

253262
gotErr := ""
254263
if err != nil {
@@ -268,7 +277,10 @@ func TestCheckRange(t *testing.T) {
268277

269278
ctx, cancel := context.WithCancel(t.Context())
270279
defer cancel()
271-
_, _, err := Range(ctx, zaptest.NewLogger(t), s, tc.op.GetRequestRange())
280+
_, _, err := Range(ctx, TxnContext{
281+
Logger: zaptest.NewLogger(t),
282+
KV: s,
283+
}, tc.op.GetRequestRange())
272284

273285
gotErr := ""
274286
if err != nil {
@@ -333,7 +345,12 @@ func TestReadonlyTxnError(t *testing.T) {
333345
},
334346
}
335347

336-
_, _, err := Txn(ctx, zaptest.NewLogger(t), txn, false, s, &lease.FakeLessor{})
348+
_, _, err := Txn(ctx, TxnContext{
349+
Logger: zaptest.NewLogger(t),
350+
Lessor: &lease.FakeLessor{},
351+
KV: s,
352+
TxnModeWriteWithSharedBuffer: false,
353+
}, txn)
337354
if err == nil || !strings.Contains(err.Error(), "applyTxn: failed Range: rangeKeys: context cancelled: context canceled") {
338355
t.Fatalf("Expected context canceled error, got %v", err)
339356
}
@@ -376,7 +393,14 @@ func TestWriteTxnPanicWithoutApply(t *testing.T) {
376393
// we verify the following properties below:
377394
// 1. server panics after a write txn aply fails (invariant: server should never try to move on from a failed write)
378395
// 2. no writes from the txn are applied to the backend (invariant: failed write should have no side-effect on DB state besides panic)
379-
assert.Panicsf(t, func() { Txn(ctx, zaptest.NewLogger(t), txn, false, s, &lease.FakeLessor{}) }, "Expected panic in Txn with writes")
396+
assert.Panicsf(t, func() {
397+
Txn(ctx, TxnContext{
398+
Logger: zaptest.NewLogger(t),
399+
Lessor: &lease.FakeLessor{},
400+
KV: s,
401+
TxnModeWriteWithSharedBuffer: false,
402+
}, txn)
403+
}, "Expected panic in Txn with writes")
380404
dbHashAfter, err := computeFileHash(bePath)
381405
require.NoErrorf(t, err, "failed to compute DB file hash after txn")
382406
require.Equalf(t, dbHashBefore, dbHashAfter, "mismatch in DB hash before and after failed write txn")

server/etcdserver/v3_server.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,14 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe
133133
return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
134134
}
135135

136-
get := func() { resp, _, err = txn.Range(ctx, s.Logger(), s.KV(), r) }
136+
get := func() {
137+
resp, _, err = txn.Range(ctx, txn.TxnContext{
138+
Logger: s.Logger(),
139+
KV: s.KV(),
140+
Lessor: s.lessor,
141+
TxnModeWriteWithSharedBuffer: s.Cfg.ServerFeatureGate.Enabled(features.TxnModeWriteWithSharedBuffer),
142+
}, r)
143+
}
137144
if serr := s.doSerialize(ctx, chk, get); serr != nil {
138145
err = serr
139146
return nil, err
@@ -184,7 +191,12 @@ func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse
184191
}(time.Now())
185192

186193
get := func() {
187-
resp, _, err = txn.Txn(ctx, s.Logger(), r, s.Cfg.ServerFeatureGate.Enabled(features.TxnModeWriteWithSharedBuffer), s.KV(), s.lessor)
194+
resp, _, err = txn.Txn(ctx, txn.TxnContext{
195+
Logger: s.Logger(),
196+
KV: s.KV(),
197+
Lessor: s.lessor,
198+
TxnModeWriteWithSharedBuffer: s.Cfg.ServerFeatureGate.Enabled(features.TxnModeWriteWithSharedBuffer),
199+
}, r)
188200
}
189201
if serr := s.doSerialize(ctx, chk, get); serr != nil {
190202
return nil, serr

0 commit comments

Comments
 (0)