diff --git a/server/etcdserver/api/v3rpc/validationfuzz_test.go b/server/etcdserver/api/v3rpc/validationfuzz_test.go index fdb3727d76b..74c8986103d 100644 --- a/server/etcdserver/api/v3rpc/validationfuzz_test.go +++ b/server/etcdserver/api/v3rpc/validationfuzz_test.go @@ -175,7 +175,12 @@ func execTransaction(t *testing.T, req *pb.RequestOp) { Success: []*pb.RequestOp{req}, } - _, _, err := txn.Txn(ctx, zaptest.NewLogger(t), request, false, s, &lease.FakeLessor{}) + _, _, err := txn.Txn(ctx, txn.TxnContext{ + Logger: zaptest.NewLogger(t), + Lessor: &lease.FakeLessor{}, + KV: s, + TxnModeWriteWithSharedBuffer: false, + }, request) if err != nil { t.Skipf("Application erroring. %s", err.Error()) } diff --git a/server/etcdserver/apply/apply.go b/server/etcdserver/apply/apply.go index 71b747bf755..08ca9e4ef8e 100644 --- a/server/etcdserver/apply/apply.go +++ b/server/etcdserver/apply/apply.go @@ -140,19 +140,28 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest, applyFunc applyFunc) } func (a *applierV3backend) Put(p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) { - return mvcctxn.Put(context.TODO(), a.options.Logger, a.options.Lessor, a.options.KV, p) + return mvcctxn.Put(context.TODO(), a.context(), p) } func (a *applierV3backend) DeleteRange(dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, *traceutil.Trace, error) { - return mvcctxn.DeleteRange(context.TODO(), a.options.Logger, a.options.KV, dr) + return mvcctxn.DeleteRange(context.TODO(), a.context(), dr) } func (a *applierV3backend) Range(r *pb.RangeRequest) (*pb.RangeResponse, *traceutil.Trace, error) { - return mvcctxn.Range(context.TODO(), a.options.Logger, a.options.KV, r) + return mvcctxn.Range(context.TODO(), a.context(), r) } func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) { - return mvcctxn.Txn(context.TODO(), a.options.Logger, rt, a.options.TxnModeWriteWithSharedBuffer, a.options.KV, a.options.Lessor) + return mvcctxn.Txn(context.TODO(), a.context(), rt) +} + +func (a *applierV3backend) context() mvcctxn.TxnContext { + return mvcctxn.TxnContext{ + Logger: a.options.Logger, + Lessor: a.options.Lessor, + KV: a.options.KV, + TxnModeWriteWithSharedBuffer: a.options.TxnModeWriteWithSharedBuffer, + } } func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) { diff --git a/server/etcdserver/txn/txn.go b/server/etcdserver/txn/txn.go index 9e8d397cb88..a0dcd252d8a 100644 --- a/server/etcdserver/txn/txn.go +++ b/server/etcdserver/txn/txn.go @@ -32,18 +32,25 @@ import ( "go.etcd.io/etcd/server/v3/storage/mvcc" ) -func Put(ctx context.Context, lg *zap.Logger, lessor lease.Lessor, kv mvcc.KV, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) { - ctx, trace = ensureTrace(ctx, lg, "put", +type TxnContext struct { + Logger *zap.Logger + Lessor lease.Lessor + KV mvcc.KV + TxnModeWriteWithSharedBuffer bool +} + +func Put(ctx context.Context, txn TxnContext, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) { + ctx, trace = ensureTrace(ctx, txn.Logger, "put", traceutil.Field{Key: "key", Value: string(p.Key)}, traceutil.Field{Key: "req_size", Value: p.Size()}, ) leaseID := lease.LeaseID(p.Lease) if leaseID != lease.NoLease { - if l := lessor.Lookup(leaseID); l == nil { + if l := txn.Lessor.Lookup(leaseID); l == nil { return nil, nil, lease.ErrLeaseNotFound } } - txnWrite := kv.Write(trace) + txnWrite := txn.KV.Write(trace) defer txnWrite.End() resp, err = put(ctx, txnWrite, p) return resp, trace, err @@ -88,12 +95,12 @@ func put(ctx context.Context, txnWrite mvcc.TxnWrite, p *pb.PutRequest) (resp *p return resp, nil } -func DeleteRange(ctx context.Context, lg *zap.Logger, kv mvcc.KV, dr *pb.DeleteRangeRequest) (resp *pb.DeleteRangeResponse, trace *traceutil.Trace, err error) { - ctx, trace = ensureTrace(ctx, lg, "delete_range", +func DeleteRange(ctx context.Context, txn TxnContext, dr *pb.DeleteRangeRequest) (resp *pb.DeleteRangeResponse, trace *traceutil.Trace, err error) { + ctx, trace = ensureTrace(ctx, txn.Logger, "delete_range", traceutil.Field{Key: "key", Value: string(dr.Key)}, traceutil.Field{Key: "range_end", Value: string(dr.RangeEnd)}, ) - txnWrite := kv.Write(trace) + txnWrite := txn.KV.Write(trace) defer txnWrite.End() resp, err = deleteRange(ctx, txnWrite, dr) return resp, trace, err @@ -121,15 +128,15 @@ func deleteRange(ctx context.Context, txnWrite mvcc.TxnWrite, dr *pb.DeleteRange return resp, nil } -func Range(ctx context.Context, lg *zap.Logger, kv mvcc.KV, r *pb.RangeRequest) (resp *pb.RangeResponse, trace *traceutil.Trace, err error) { - ctx, trace = ensureTrace(ctx, lg, "range") +func Range(ctx context.Context, txn TxnContext, r *pb.RangeRequest) (resp *pb.RangeResponse, trace *traceutil.Trace, err error) { + ctx, trace = ensureTrace(ctx, txn.Logger, "range") defer func(start time.Time) { success := err == nil RangeSecObserve(success, time.Since(start)) }(time.Now()) - txnRead := kv.Read(mvcc.ConcurrentReadTxMode, trace) + txnRead := txn.KV.Read(mvcc.ConcurrentReadTxMode, trace) defer txnRead.End() - resp, err = executeRange(ctx, lg, txnRead, r) + resp, err = executeRange(ctx, txn.Logger, txnRead, r) return resp, trace, err } @@ -233,18 +240,18 @@ func executeRange(ctx context.Context, lg *zap.Logger, txnRead mvcc.TxnRead, r * return resp, nil } -func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWithSharedBuffer bool, kv mvcc.KV, lessor lease.Lessor) (txnResp *pb.TxnResponse, trace *traceutil.Trace, err error) { - ctx, trace = ensureTrace(ctx, lg, "transaction") +func Txn(ctx context.Context, txn TxnContext, rt *pb.TxnRequest) (txnResp *pb.TxnResponse, trace *traceutil.Trace, err error) { + ctx, trace = ensureTrace(ctx, txn.Logger, "transaction") isWrite := !IsTxnReadonly(rt) // When the transaction contains write operations, we use ReadTx instead of // ConcurrentReadTx to avoid extra overhead of copying buffer. var mode mvcc.ReadTxMode - if isWrite && txnModeWriteWithSharedBuffer /*a.s.Cfg.ServerFeatureGate.Enabled(features.TxnModeWriteWithSharedBuffer)*/ { + if isWrite && txn.TxnModeWriteWithSharedBuffer /*a.s.Cfg.ServerFeatureGate.Enabled(features.TxnModeWriteWithSharedBuffer)*/ { mode = mvcc.SharedBufReadTxMode } else { mode = mvcc.ConcurrentReadTxMode } - txnRead := kv.Read(mode, trace) + txnRead := txn.KV.Read(mode, trace) var txnPath []bool trace.StepWithFunction( func() { @@ -255,7 +262,7 @@ func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWit if isWrite { trace.AddField(traceutil.Field{Key: "read_only", Value: false}) } - _, err = checkTxn(txnRead, rt, lessor, txnPath) + _, err = checkTxn(txnRead, rt, txn.Lessor, txnPath) if err != nil { txnRead.End() return nil, nil, err @@ -268,11 +275,11 @@ func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWit var txnWrite mvcc.TxnWrite if isWrite { txnRead.End() - txnWrite = kv.Write(trace) + txnWrite = txn.KV.Write(trace) } else { txnWrite = mvcc.NewReadOnlyTxnWrite(txnRead) } - txnResp, err = txn(ctx, lg, txnWrite, rt, isWrite, txnPath) + txnResp, err = assembleTxnResponse(ctx, txn.Logger, txnWrite, rt, isWrite, txnPath) txnWrite.End() trace.AddField( @@ -282,7 +289,7 @@ func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWit return txnResp, trace, err } -func txn(ctx context.Context, lg *zap.Logger, txnWrite mvcc.TxnWrite, rt *pb.TxnRequest, isWrite bool, txnPath []bool) (*pb.TxnResponse, error) { +func assembleTxnResponse(ctx context.Context, lg *zap.Logger, txnWrite mvcc.TxnWrite, rt *pb.TxnRequest, isWrite bool, txnPath []bool) (*pb.TxnResponse, error) { txnResp, _ := newTxnResp(rt, txnPath) _, err := executeTxn(ctx, lg, txnWrite, rt, txnPath, txnResp) if err != nil { diff --git a/server/etcdserver/txn/txn_test.go b/server/etcdserver/txn/txn_test.go index 8b73424fd75..cec6cb7667e 100644 --- a/server/etcdserver/txn/txn_test.go +++ b/server/etcdserver/txn/txn_test.go @@ -228,7 +228,12 @@ func TestCheckTxn(t *testing.T) { ctx, cancel := context.WithCancel(t.Context()) defer cancel() - _, _, err := Txn(ctx, zaptest.NewLogger(t), tc.txn, false, s, lessor) + _, _, err := Txn(ctx, TxnContext{ + Logger: zaptest.NewLogger(t), + Lessor: lessor, + KV: s, + TxnModeWriteWithSharedBuffer: false, + }, tc.txn) gotErr := "" if err != nil { @@ -248,7 +253,11 @@ func TestCheckPut(t *testing.T) { ctx, cancel := context.WithCancel(t.Context()) defer cancel() - _, _, err := Put(ctx, zaptest.NewLogger(t), lessor, s, tc.op.GetRequestPut()) + _, _, err := Put(ctx, TxnContext{ + Logger: zaptest.NewLogger(t), + Lessor: lessor, + KV: s, + }, tc.op.GetRequestPut()) gotErr := "" if err != nil { @@ -268,7 +277,10 @@ func TestCheckRange(t *testing.T) { ctx, cancel := context.WithCancel(t.Context()) defer cancel() - _, _, err := Range(ctx, zaptest.NewLogger(t), s, tc.op.GetRequestRange()) + _, _, err := Range(ctx, TxnContext{ + Logger: zaptest.NewLogger(t), + KV: s, + }, tc.op.GetRequestRange()) gotErr := "" if err != nil { @@ -333,7 +345,12 @@ func TestReadonlyTxnError(t *testing.T) { }, } - _, _, err := Txn(ctx, zaptest.NewLogger(t), txn, false, s, &lease.FakeLessor{}) + _, _, err := Txn(ctx, TxnContext{ + Logger: zaptest.NewLogger(t), + Lessor: &lease.FakeLessor{}, + KV: s, + TxnModeWriteWithSharedBuffer: false, + }, txn) if err == nil || !strings.Contains(err.Error(), "applyTxn: failed Range: rangeKeys: context cancelled: context canceled") { t.Fatalf("Expected context canceled error, got %v", err) } @@ -376,7 +393,14 @@ func TestWriteTxnPanicWithoutApply(t *testing.T) { // we verify the following properties below: // 1. server panics after a write txn aply fails (invariant: server should never try to move on from a failed write) // 2. no writes from the txn are applied to the backend (invariant: failed write should have no side-effect on DB state besides panic) - assert.Panicsf(t, func() { Txn(ctx, zaptest.NewLogger(t), txn, false, s, &lease.FakeLessor{}) }, "Expected panic in Txn with writes") + assert.Panicsf(t, func() { + Txn(ctx, TxnContext{ + Logger: zaptest.NewLogger(t), + Lessor: &lease.FakeLessor{}, + KV: s, + TxnModeWriteWithSharedBuffer: false, + }, txn) + }, "Expected panic in Txn with writes") dbHashAfter, err := computeFileHash(bePath) require.NoErrorf(t, err, "failed to compute DB file hash after txn") require.Equalf(t, dbHashBefore, dbHashAfter, "mismatch in DB hash before and after failed write txn") diff --git a/server/etcdserver/v3_server.go b/server/etcdserver/v3_server.go index c6953604aa2..b5a14bcc5cb 100644 --- a/server/etcdserver/v3_server.go +++ b/server/etcdserver/v3_server.go @@ -133,7 +133,14 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd) } - get := func() { resp, _, err = txn.Range(ctx, s.Logger(), s.KV(), r) } + get := func() { + resp, _, err = txn.Range(ctx, txn.TxnContext{ + Logger: s.Logger(), + KV: s.KV(), + Lessor: s.lessor, + TxnModeWriteWithSharedBuffer: s.Cfg.ServerFeatureGate.Enabled(features.TxnModeWriteWithSharedBuffer), + }, r) + } if serr := s.doSerialize(ctx, chk, get); serr != nil { err = serr return nil, err @@ -184,7 +191,12 @@ func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse }(time.Now()) get := func() { - resp, _, err = txn.Txn(ctx, s.Logger(), r, s.Cfg.ServerFeatureGate.Enabled(features.TxnModeWriteWithSharedBuffer), s.KV(), s.lessor) + resp, _, err = txn.Txn(ctx, txn.TxnContext{ + Logger: s.Logger(), + KV: s.KV(), + Lessor: s.lessor, + TxnModeWriteWithSharedBuffer: s.Cfg.ServerFeatureGate.Enabled(features.TxnModeWriteWithSharedBuffer), + }, r) } if serr := s.doSerialize(ctx, chk, get); serr != nil { return nil, serr