Skip to content

Commit f3c78cc

Browse files
committed
Combine common txn arguments into TxnContext
Signed-off-by: Marek Siarkowicz <[email protected]>
1 parent cc90010 commit f3c78cc

File tree

5 files changed

+93
-31
lines changed

5 files changed

+93
-31
lines changed

server/etcdserver/api/v3rpc/validationfuzz_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,12 @@ func execTransaction(t *testing.T, req *pb.RequestOp) {
175175
Success: []*pb.RequestOp{req},
176176
}
177177

178-
_, _, err := txn.Txn(ctx, zaptest.NewLogger(t), request, false, s, &lease.FakeLessor{})
178+
_, _, err := txn.Txn(ctx, txn.TxnContext{
179+
Logger: zaptest.NewLogger(t),
180+
Lessor: &lease.FakeLessor{},
181+
KV: s,
182+
TxnModeWriteWithSharedBuffer: false,
183+
}, request)
179184
if err != nil {
180185
t.Skipf("Application erroring. %s", err.Error())
181186
}

server/etcdserver/apply/apply.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -153,19 +153,33 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest, applyFunc applyFunc)
153153
}
154154

155155
func (a *applierV3backend) Put(p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
156-
return mvcctxn.Put(context.TODO(), a.lg, a.lessor, a.kv, p)
156+
return mvcctxn.Put(context.TODO(), mvcctxn.TxnContext{
157+
Logger: a.lg,
158+
Lessor: a.lessor,
159+
KV: a.kv,
160+
TxnModeWriteWithSharedBuffer: a.txnModeWriteWithSharedBuffer,
161+
}, p)
157162
}
158163

159164
func (a *applierV3backend) DeleteRange(dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, *traceutil.Trace, error) {
160-
return mvcctxn.DeleteRange(context.TODO(), a.lg, a.kv, dr)
165+
return mvcctxn.DeleteRange(context.TODO(), a.context(), dr)
161166
}
162167

163168
func (a *applierV3backend) Range(r *pb.RangeRequest) (*pb.RangeResponse, *traceutil.Trace, error) {
164-
return mvcctxn.Range(context.TODO(), a.lg, a.kv, r)
169+
return mvcctxn.Range(context.TODO(), a.context(), r)
165170
}
166171

167172
func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
168-
return mvcctxn.Txn(context.TODO(), a.lg, rt, a.txnModeWriteWithSharedBuffer, a.kv, a.lessor)
173+
return mvcctxn.Txn(context.TODO(), a.context(), rt)
174+
}
175+
176+
func (a *applierV3backend) context() mvcctxn.TxnContext {
177+
return mvcctxn.TxnContext{
178+
Logger: a.lg,
179+
Lessor: a.lessor,
180+
KV: a.kv,
181+
TxnModeWriteWithSharedBuffer: a.txnModeWriteWithSharedBuffer,
182+
}
169183
}
170184

171185
func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) {

server/etcdserver/txn/txn.go

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -32,18 +32,25 @@ import (
3232
"go.etcd.io/etcd/server/v3/storage/mvcc"
3333
)
3434

35-
func Put(ctx context.Context, lg *zap.Logger, lessor lease.Lessor, kv mvcc.KV, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
36-
ctx, trace = ensureTrace(ctx, lg, "put",
35+
type TxnContext struct {
36+
Logger *zap.Logger
37+
Lessor lease.Lessor
38+
KV mvcc.KV
39+
TxnModeWriteWithSharedBuffer bool
40+
}
41+
42+
func Put(ctx context.Context, txn TxnContext, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
43+
ctx, trace = ensureTrace(ctx, txn.Logger, "put",
3744
traceutil.Field{Key: "key", Value: string(p.Key)},
3845
traceutil.Field{Key: "req_size", Value: p.Size()},
3946
)
4047
leaseID := lease.LeaseID(p.Lease)
4148
if leaseID != lease.NoLease {
42-
if l := lessor.Lookup(leaseID); l == nil {
49+
if l := txn.Lessor.Lookup(leaseID); l == nil {
4350
return nil, nil, lease.ErrLeaseNotFound
4451
}
4552
}
46-
txnWrite := kv.Write(trace)
53+
txnWrite := txn.KV.Write(trace)
4754
defer txnWrite.End()
4855
resp, err = put(ctx, txnWrite, p)
4956
return resp, trace, err
@@ -88,12 +95,12 @@ func put(ctx context.Context, txnWrite mvcc.TxnWrite, p *pb.PutRequest) (resp *p
8895
return resp, nil
8996
}
9097

91-
func DeleteRange(ctx context.Context, lg *zap.Logger, kv mvcc.KV, dr *pb.DeleteRangeRequest) (resp *pb.DeleteRangeResponse, trace *traceutil.Trace, err error) {
92-
ctx, trace = ensureTrace(ctx, lg, "delete_range",
98+
func DeleteRange(ctx context.Context, txn TxnContext, dr *pb.DeleteRangeRequest) (resp *pb.DeleteRangeResponse, trace *traceutil.Trace, err error) {
99+
ctx, trace = ensureTrace(ctx, txn.Logger, "delete_range",
93100
traceutil.Field{Key: "key", Value: string(dr.Key)},
94101
traceutil.Field{Key: "range_end", Value: string(dr.RangeEnd)},
95102
)
96-
txnWrite := kv.Write(trace)
103+
txnWrite := txn.KV.Write(trace)
97104
defer txnWrite.End()
98105
resp, err = deleteRange(ctx, txnWrite, dr)
99106
return resp, trace, err
@@ -121,15 +128,15 @@ func deleteRange(ctx context.Context, txnWrite mvcc.TxnWrite, dr *pb.DeleteRange
121128
return resp, nil
122129
}
123130

124-
func Range(ctx context.Context, lg *zap.Logger, kv mvcc.KV, r *pb.RangeRequest) (resp *pb.RangeResponse, trace *traceutil.Trace, err error) {
125-
ctx, trace = ensureTrace(ctx, lg, "range")
131+
func Range(ctx context.Context, txn TxnContext, r *pb.RangeRequest) (resp *pb.RangeResponse, trace *traceutil.Trace, err error) {
132+
ctx, trace = ensureTrace(ctx, txn.Logger, "range")
126133
defer func(start time.Time) {
127134
success := err == nil
128135
RangeSecObserve(success, time.Since(start))
129136
}(time.Now())
130-
txnRead := kv.Read(mvcc.ConcurrentReadTxMode, trace)
137+
txnRead := txn.KV.Read(mvcc.ConcurrentReadTxMode, trace)
131138
defer txnRead.End()
132-
resp, err = executeRange(ctx, lg, txnRead, r)
139+
resp, err = executeRange(ctx, txn.Logger, txnRead, r)
133140
return resp, trace, err
134141
}
135142

@@ -233,18 +240,18 @@ func executeRange(ctx context.Context, lg *zap.Logger, txnRead mvcc.TxnRead, r *
233240
return resp, nil
234241
}
235242

236-
func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWithSharedBuffer bool, kv mvcc.KV, lessor lease.Lessor) (txnResp *pb.TxnResponse, trace *traceutil.Trace, err error) {
237-
ctx, trace = ensureTrace(ctx, lg, "transaction")
243+
func Txn(ctx context.Context, txn TxnContext, rt *pb.TxnRequest) (txnResp *pb.TxnResponse, trace *traceutil.Trace, err error) {
244+
ctx, trace = ensureTrace(ctx, txn.Logger, "transaction")
238245
isWrite := !IsTxnReadonly(rt)
239246
// When the transaction contains write operations, we use ReadTx instead of
240247
// ConcurrentReadTx to avoid extra overhead of copying buffer.
241248
var mode mvcc.ReadTxMode
242-
if isWrite && txnModeWriteWithSharedBuffer /*a.s.Cfg.ServerFeatureGate.Enabled(features.TxnModeWriteWithSharedBuffer)*/ {
249+
if isWrite && txn.TxnModeWriteWithSharedBuffer /*a.s.Cfg.ServerFeatureGate.Enabled(features.TxnModeWriteWithSharedBuffer)*/ {
243250
mode = mvcc.SharedBufReadTxMode
244251
} else {
245252
mode = mvcc.ConcurrentReadTxMode
246253
}
247-
txnRead := kv.Read(mode, trace)
254+
txnRead := txn.KV.Read(mode, trace)
248255
var txnPath []bool
249256
trace.StepWithFunction(
250257
func() {
@@ -255,7 +262,7 @@ func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWit
255262
if isWrite {
256263
trace.AddField(traceutil.Field{Key: "read_only", Value: false})
257264
}
258-
_, err = checkTxn(txnRead, rt, lessor, txnPath)
265+
_, err = checkTxn(txnRead, rt, txn.Lessor, txnPath)
259266
if err != nil {
260267
txnRead.End()
261268
return nil, nil, err
@@ -268,11 +275,11 @@ func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWit
268275
var txnWrite mvcc.TxnWrite
269276
if isWrite {
270277
txnRead.End()
271-
txnWrite = kv.Write(trace)
278+
txnWrite = txn.KV.Write(trace)
272279
} else {
273280
txnWrite = mvcc.NewReadOnlyTxnWrite(txnRead)
274281
}
275-
txnResp, err = txn(ctx, lg, txnWrite, rt, isWrite, txnPath)
282+
txnResp, err = assembleTxnResponse(ctx, txn.Logger, txnWrite, rt, isWrite, txnPath)
276283
txnWrite.End()
277284

278285
trace.AddField(
@@ -282,7 +289,7 @@ func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWit
282289
return txnResp, trace, err
283290
}
284291

285-
func txn(ctx context.Context, lg *zap.Logger, txnWrite mvcc.TxnWrite, rt *pb.TxnRequest, isWrite bool, txnPath []bool) (*pb.TxnResponse, error) {
292+
func assembleTxnResponse(ctx context.Context, lg *zap.Logger, txnWrite mvcc.TxnWrite, rt *pb.TxnRequest, isWrite bool, txnPath []bool) (*pb.TxnResponse, error) {
286293
txnResp, _ := newTxnResp(rt, txnPath)
287294
_, err := executeTxn(ctx, lg, txnWrite, rt, txnPath, txnResp)
288295
if err != nil {

server/etcdserver/txn/txn_test.go

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,12 @@ func TestCheckTxn(t *testing.T) {
228228

229229
ctx, cancel := context.WithCancel(t.Context())
230230
defer cancel()
231-
_, _, err := Txn(ctx, zaptest.NewLogger(t), tc.txn, false, s, lessor)
231+
_, _, err := Txn(ctx, TxnContext{
232+
Logger: zaptest.NewLogger(t),
233+
Lessor: lessor,
234+
KV: s,
235+
TxnModeWriteWithSharedBuffer: false,
236+
}, tc.txn)
232237

233238
gotErr := ""
234239
if err != nil {
@@ -248,7 +253,11 @@ func TestCheckPut(t *testing.T) {
248253

249254
ctx, cancel := context.WithCancel(t.Context())
250255
defer cancel()
251-
_, _, err := Put(ctx, zaptest.NewLogger(t), lessor, s, tc.op.GetRequestPut())
256+
_, _, err := Put(ctx, TxnContext{
257+
Logger: zaptest.NewLogger(t),
258+
Lessor: lessor,
259+
KV: s,
260+
}, tc.op.GetRequestPut())
252261

253262
gotErr := ""
254263
if err != nil {
@@ -268,7 +277,10 @@ func TestCheckRange(t *testing.T) {
268277

269278
ctx, cancel := context.WithCancel(t.Context())
270279
defer cancel()
271-
_, _, err := Range(ctx, zaptest.NewLogger(t), s, tc.op.GetRequestRange())
280+
_, _, err := Range(ctx, TxnContext{
281+
Logger: zaptest.NewLogger(t),
282+
KV: s,
283+
}, tc.op.GetRequestRange())
272284

273285
gotErr := ""
274286
if err != nil {
@@ -333,7 +345,12 @@ func TestReadonlyTxnError(t *testing.T) {
333345
},
334346
}
335347

336-
_, _, err := Txn(ctx, zaptest.NewLogger(t), txn, false, s, &lease.FakeLessor{})
348+
_, _, err := Txn(ctx, TxnContext{
349+
Logger: zaptest.NewLogger(t),
350+
Lessor: &lease.FakeLessor{},
351+
KV: s,
352+
TxnModeWriteWithSharedBuffer: false,
353+
}, txn)
337354
if err == nil || !strings.Contains(err.Error(), "applyTxn: failed Range: rangeKeys: context cancelled: context canceled") {
338355
t.Fatalf("Expected context canceled error, got %v", err)
339356
}
@@ -376,7 +393,14 @@ func TestWriteTxnPanicWithoutApply(t *testing.T) {
376393
// we verify the following properties below:
377394
// 1. server panics after a write txn aply fails (invariant: server should never try to move on from a failed write)
378395
// 2. no writes from the txn are applied to the backend (invariant: failed write should have no side-effect on DB state besides panic)
379-
assert.Panicsf(t, func() { Txn(ctx, zaptest.NewLogger(t), txn, false, s, &lease.FakeLessor{}) }, "Expected panic in Txn with writes")
396+
assert.Panicsf(t, func() {
397+
Txn(ctx, TxnContext{
398+
Logger: zaptest.NewLogger(t),
399+
Lessor: &lease.FakeLessor{},
400+
KV: s,
401+
TxnModeWriteWithSharedBuffer: false,
402+
}, txn)
403+
}, "Expected panic in Txn with writes")
380404
dbHashAfter, err := computeFileHash(bePath)
381405
require.NoErrorf(t, err, "failed to compute DB file hash after txn")
382406
require.Equalf(t, dbHashBefore, dbHashAfter, "mismatch in DB hash before and after failed write txn")

server/etcdserver/v3_server.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,14 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe
133133
return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
134134
}
135135

136-
get := func() { resp, _, err = txn.Range(ctx, s.Logger(), s.KV(), r) }
136+
get := func() {
137+
resp, _, err = txn.Range(ctx, txn.TxnContext{
138+
Logger: s.Logger(),
139+
KV: s.KV(),
140+
Lessor: s.lessor,
141+
TxnModeWriteWithSharedBuffer: s.Cfg.ServerFeatureGate.Enabled(features.TxnModeWriteWithSharedBuffer),
142+
}, r)
143+
}
137144
if serr := s.doSerialize(ctx, chk, get); serr != nil {
138145
err = serr
139146
return nil, err
@@ -184,7 +191,12 @@ func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse
184191
}(time.Now())
185192

186193
get := func() {
187-
resp, _, err = txn.Txn(ctx, s.Logger(), r, s.Cfg.ServerFeatureGate.Enabled(features.TxnModeWriteWithSharedBuffer), s.KV(), s.lessor)
194+
resp, _, err = txn.Txn(ctx, txn.TxnContext{
195+
Logger: s.Logger(),
196+
KV: s.KV(),
197+
Lessor: s.lessor,
198+
TxnModeWriteWithSharedBuffer: s.Cfg.ServerFeatureGate.Enabled(features.TxnModeWriteWithSharedBuffer),
199+
}, r)
188200
}
189201
if serr := s.doSerialize(ctx, chk, get); serr != nil {
190202
return nil, serr

0 commit comments

Comments
 (0)