Skip to content

Combine common txn arguments into TxnContext #19809

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion server/etcdserver/api/v3rpc/validationfuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,12 @@ func execTransaction(t *testing.T, req *pb.RequestOp) {
Success: []*pb.RequestOp{req},
}

_, _, err := txn.Txn(ctx, zaptest.NewLogger(t), request, false, s, &lease.FakeLessor{})
_, _, err := txn.Txn(ctx, txn.TxnContext{
Logger: zaptest.NewLogger(t),
Lessor: &lease.FakeLessor{},
KV: s,
TxnModeWriteWithSharedBuffer: false,
}, request)
if err != nil {
t.Skipf("Application erroring. %s", err.Error())
}
Expand Down
17 changes: 13 additions & 4 deletions server/etcdserver/apply/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,19 +140,28 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest, applyFunc applyFunc)
}

func (a *applierV3backend) Put(p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
return mvcctxn.Put(context.TODO(), a.options.Logger, a.options.Lessor, a.options.KV, p)
return mvcctxn.Put(context.TODO(), a.context(), p)
}

func (a *applierV3backend) DeleteRange(dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, *traceutil.Trace, error) {
return mvcctxn.DeleteRange(context.TODO(), a.options.Logger, a.options.KV, dr)
return mvcctxn.DeleteRange(context.TODO(), a.context(), dr)
}

func (a *applierV3backend) Range(r *pb.RangeRequest) (*pb.RangeResponse, *traceutil.Trace, error) {
return mvcctxn.Range(context.TODO(), a.options.Logger, a.options.KV, r)
return mvcctxn.Range(context.TODO(), a.context(), r)
}

func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
return mvcctxn.Txn(context.TODO(), a.options.Logger, rt, a.options.TxnModeWriteWithSharedBuffer, a.options.KV, a.options.Lessor)
return mvcctxn.Txn(context.TODO(), a.context(), rt)
}

func (a *applierV3backend) context() mvcctxn.TxnContext {
return mvcctxn.TxnContext{
Logger: a.options.Logger,
Lessor: a.options.Lessor,
KV: a.options.KV,
TxnModeWriteWithSharedBuffer: a.options.TxnModeWriteWithSharedBuffer,
}
}

func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) {
Expand Down
45 changes: 26 additions & 19 deletions server/etcdserver/txn/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,25 @@ import (
"go.etcd.io/etcd/server/v3/storage/mvcc"
)

func Put(ctx context.Context, lg *zap.Logger, lessor lease.Lessor, kv mvcc.KV, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
ctx, trace = ensureTrace(ctx, lg, "put",
type TxnContext struct {
Logger *zap.Logger
Lessor lease.Lessor
KV mvcc.KV
TxnModeWriteWithSharedBuffer bool
}

func Put(ctx context.Context, txn TxnContext, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: how about renaming txn to txnContext? because txn is usually short for transaction, which it is not.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SG

ctx, trace = ensureTrace(ctx, txn.Logger, "put",
traceutil.Field{Key: "key", Value: string(p.Key)},
traceutil.Field{Key: "req_size", Value: p.Size()},
)
leaseID := lease.LeaseID(p.Lease)
if leaseID != lease.NoLease {
if l := lessor.Lookup(leaseID); l == nil {
if l := txn.Lessor.Lookup(leaseID); l == nil {
return nil, nil, lease.ErrLeaseNotFound
}
}
txnWrite := kv.Write(trace)
txnWrite := txn.KV.Write(trace)
defer txnWrite.End()
resp, err = put(ctx, txnWrite, p)
return resp, trace, err
Expand Down Expand Up @@ -88,12 +95,12 @@ func put(ctx context.Context, txnWrite mvcc.TxnWrite, p *pb.PutRequest) (resp *p
return resp, nil
}

func DeleteRange(ctx context.Context, lg *zap.Logger, kv mvcc.KV, dr *pb.DeleteRangeRequest) (resp *pb.DeleteRangeResponse, trace *traceutil.Trace, err error) {
ctx, trace = ensureTrace(ctx, lg, "delete_range",
func DeleteRange(ctx context.Context, txn TxnContext, dr *pb.DeleteRangeRequest) (resp *pb.DeleteRangeResponse, trace *traceutil.Trace, err error) {
ctx, trace = ensureTrace(ctx, txn.Logger, "delete_range",
traceutil.Field{Key: "key", Value: string(dr.Key)},
traceutil.Field{Key: "range_end", Value: string(dr.RangeEnd)},
)
txnWrite := kv.Write(trace)
txnWrite := txn.KV.Write(trace)
defer txnWrite.End()
resp, err = deleteRange(ctx, txnWrite, dr)
return resp, trace, err
Expand Down Expand Up @@ -121,15 +128,15 @@ func deleteRange(ctx context.Context, txnWrite mvcc.TxnWrite, dr *pb.DeleteRange
return resp, nil
}

func Range(ctx context.Context, lg *zap.Logger, kv mvcc.KV, r *pb.RangeRequest) (resp *pb.RangeResponse, trace *traceutil.Trace, err error) {
ctx, trace = ensureTrace(ctx, lg, "range")
func Range(ctx context.Context, txn TxnContext, r *pb.RangeRequest) (resp *pb.RangeResponse, trace *traceutil.Trace, err error) {
ctx, trace = ensureTrace(ctx, txn.Logger, "range")
defer func(start time.Time) {
success := err == nil
RangeSecObserve(success, time.Since(start))
}(time.Now())
txnRead := kv.Read(mvcc.ConcurrentReadTxMode, trace)
txnRead := txn.KV.Read(mvcc.ConcurrentReadTxMode, trace)
defer txnRead.End()
resp, err = executeRange(ctx, lg, txnRead, r)
resp, err = executeRange(ctx, txn.Logger, txnRead, r)
return resp, trace, err
}

Expand Down Expand Up @@ -233,18 +240,18 @@ func executeRange(ctx context.Context, lg *zap.Logger, txnRead mvcc.TxnRead, r *
return resp, nil
}

func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWithSharedBuffer bool, kv mvcc.KV, lessor lease.Lessor) (txnResp *pb.TxnResponse, trace *traceutil.Trace, err error) {
ctx, trace = ensureTrace(ctx, lg, "transaction")
func Txn(ctx context.Context, txn TxnContext, rt *pb.TxnRequest) (txnResp *pb.TxnResponse, trace *traceutil.Trace, err error) {
ctx, trace = ensureTrace(ctx, txn.Logger, "transaction")
isWrite := !IsTxnReadonly(rt)
// When the transaction contains write operations, we use ReadTx instead of
// ConcurrentReadTx to avoid extra overhead of copying buffer.
var mode mvcc.ReadTxMode
if isWrite && txnModeWriteWithSharedBuffer /*a.s.Cfg.ServerFeatureGate.Enabled(features.TxnModeWriteWithSharedBuffer)*/ {
if isWrite && txn.TxnModeWriteWithSharedBuffer /*a.s.Cfg.ServerFeatureGate.Enabled(features.TxnModeWriteWithSharedBuffer)*/ {
mode = mvcc.SharedBufReadTxMode
} else {
mode = mvcc.ConcurrentReadTxMode
}
txnRead := kv.Read(mode, trace)
txnRead := txn.KV.Read(mode, trace)
var txnPath []bool
trace.StepWithFunction(
func() {
Expand All @@ -255,7 +262,7 @@ func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWit
if isWrite {
trace.AddField(traceutil.Field{Key: "read_only", Value: false})
}
_, err = checkTxn(txnRead, rt, lessor, txnPath)
_, err = checkTxn(txnRead, rt, txn.Lessor, txnPath)
if err != nil {
txnRead.End()
return nil, nil, err
Expand All @@ -268,11 +275,11 @@ func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWit
var txnWrite mvcc.TxnWrite
if isWrite {
txnRead.End()
txnWrite = kv.Write(trace)
txnWrite = txn.KV.Write(trace)
} else {
txnWrite = mvcc.NewReadOnlyTxnWrite(txnRead)
}
txnResp, err = txn(ctx, lg, txnWrite, rt, isWrite, txnPath)
txnResp, err = assembleTxnResponse(ctx, txn.Logger, txnWrite, rt, isWrite, txnPath)
txnWrite.End()

trace.AddField(
Expand All @@ -282,7 +289,7 @@ func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWit
return txnResp, trace, err
}

func txn(ctx context.Context, lg *zap.Logger, txnWrite mvcc.TxnWrite, rt *pb.TxnRequest, isWrite bool, txnPath []bool) (*pb.TxnResponse, error) {
func assembleTxnResponse(ctx context.Context, lg *zap.Logger, txnWrite mvcc.TxnWrite, rt *pb.TxnRequest, isWrite bool, txnPath []bool) (*pb.TxnResponse, error) {
txnResp, _ := newTxnResp(rt, txnPath)
_, err := executeTxn(ctx, lg, txnWrite, rt, txnPath, txnResp)
if err != nil {
Expand Down
34 changes: 29 additions & 5 deletions server/etcdserver/txn/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,12 @@ func TestCheckTxn(t *testing.T) {

ctx, cancel := context.WithCancel(t.Context())
defer cancel()
_, _, err := Txn(ctx, zaptest.NewLogger(t), tc.txn, false, s, lessor)
_, _, err := Txn(ctx, TxnContext{
Logger: zaptest.NewLogger(t),
Lessor: lessor,
KV: s,
TxnModeWriteWithSharedBuffer: false,
}, tc.txn)

gotErr := ""
if err != nil {
Expand All @@ -248,7 +253,11 @@ func TestCheckPut(t *testing.T) {

ctx, cancel := context.WithCancel(t.Context())
defer cancel()
_, _, err := Put(ctx, zaptest.NewLogger(t), lessor, s, tc.op.GetRequestPut())
_, _, err := Put(ctx, TxnContext{
Logger: zaptest.NewLogger(t),
Lessor: lessor,
KV: s,
}, tc.op.GetRequestPut())

gotErr := ""
if err != nil {
Expand All @@ -268,7 +277,10 @@ func TestCheckRange(t *testing.T) {

ctx, cancel := context.WithCancel(t.Context())
defer cancel()
_, _, err := Range(ctx, zaptest.NewLogger(t), s, tc.op.GetRequestRange())
_, _, err := Range(ctx, TxnContext{
Logger: zaptest.NewLogger(t),
KV: s,
}, tc.op.GetRequestRange())

gotErr := ""
if err != nil {
Expand Down Expand Up @@ -333,7 +345,12 @@ func TestReadonlyTxnError(t *testing.T) {
},
}

_, _, err := Txn(ctx, zaptest.NewLogger(t), txn, false, s, &lease.FakeLessor{})
_, _, err := Txn(ctx, TxnContext{
Logger: zaptest.NewLogger(t),
Lessor: &lease.FakeLessor{},
KV: s,
TxnModeWriteWithSharedBuffer: false,
}, txn)
if err == nil || !strings.Contains(err.Error(), "applyTxn: failed Range: rangeKeys: context cancelled: context canceled") {
t.Fatalf("Expected context canceled error, got %v", err)
}
Expand Down Expand Up @@ -376,7 +393,14 @@ func TestWriteTxnPanicWithoutApply(t *testing.T) {
// we verify the following properties below:
// 1. server panics after a write txn aply fails (invariant: server should never try to move on from a failed write)
// 2. no writes from the txn are applied to the backend (invariant: failed write should have no side-effect on DB state besides panic)
assert.Panicsf(t, func() { Txn(ctx, zaptest.NewLogger(t), txn, false, s, &lease.FakeLessor{}) }, "Expected panic in Txn with writes")
assert.Panicsf(t, func() {
Txn(ctx, TxnContext{
Logger: zaptest.NewLogger(t),
Lessor: &lease.FakeLessor{},
KV: s,
TxnModeWriteWithSharedBuffer: false,
}, txn)
}, "Expected panic in Txn with writes")
dbHashAfter, err := computeFileHash(bePath)
require.NoErrorf(t, err, "failed to compute DB file hash after txn")
require.Equalf(t, dbHashBefore, dbHashAfter, "mismatch in DB hash before and after failed write txn")
Expand Down
16 changes: 14 additions & 2 deletions server/etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,14 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe
return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
}

get := func() { resp, _, err = txn.Range(ctx, s.Logger(), s.KV(), r) }
get := func() {
resp, _, err = txn.Range(ctx, txn.TxnContext{
Logger: s.Logger(),
KV: s.KV(),
Lessor: s.lessor,
TxnModeWriteWithSharedBuffer: s.Cfg.ServerFeatureGate.Enabled(features.TxnModeWriteWithSharedBuffer),
}, r)
}
if serr := s.doSerialize(ctx, chk, get); serr != nil {
err = serr
return nil, err
Expand Down Expand Up @@ -184,7 +191,12 @@ func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse
}(time.Now())

get := func() {
resp, _, err = txn.Txn(ctx, s.Logger(), r, s.Cfg.ServerFeatureGate.Enabled(features.TxnModeWriteWithSharedBuffer), s.KV(), s.lessor)
resp, _, err = txn.Txn(ctx, txn.TxnContext{
Logger: s.Logger(),
KV: s.KV(),
Lessor: s.lessor,
TxnModeWriteWithSharedBuffer: s.Cfg.ServerFeatureGate.Enabled(features.TxnModeWriteWithSharedBuffer),
}, r)
}
if serr := s.doSerialize(ctx, chk, get); serr != nil {
return nil, serr
Expand Down