Skip to content

Commit e4fb13d

Browse files
authored
Merge pull request #19826 from serathius/refactor-txn-files
Split other methods in txn into separate files
2 parents 88bba4d + ef21176 commit e4fb13d

File tree

4 files changed

+387
-308
lines changed

4 files changed

+387
-308
lines changed

server/etcdserver/txn/delete.go

+70
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
// Copyright 2025 The etcd Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package txn
16+
17+
import (
18+
"context"
19+
20+
"go.uber.org/zap"
21+
22+
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
23+
"go.etcd.io/etcd/api/v3/mvccpb"
24+
"go.etcd.io/etcd/pkg/v3/traceutil"
25+
"go.etcd.io/etcd/server/v3/storage/mvcc"
26+
)
27+
28+
func DeleteRange(ctx context.Context, lg *zap.Logger, kv mvcc.KV, dr *pb.DeleteRangeRequest) (resp *pb.DeleteRangeResponse, trace *traceutil.Trace, err error) {
29+
ctx, trace = ensureTrace(ctx, lg, "delete_range",
30+
traceutil.Field{Key: "key", Value: string(dr.Key)},
31+
traceutil.Field{Key: "range_end", Value: string(dr.RangeEnd)},
32+
)
33+
txnWrite := kv.Write(trace)
34+
defer txnWrite.End()
35+
resp, err = deleteRange(ctx, txnWrite, dr)
36+
return resp, trace, err
37+
}
38+
39+
func deleteRange(ctx context.Context, txnWrite mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
40+
resp := &pb.DeleteRangeResponse{}
41+
resp.Header = &pb.ResponseHeader{}
42+
end := mkGteRange(dr.RangeEnd)
43+
44+
if dr.PrevKv {
45+
rr, err := txnWrite.Range(ctx, dr.Key, end, mvcc.RangeOptions{})
46+
if err != nil {
47+
return nil, err
48+
}
49+
if rr != nil {
50+
resp.PrevKvs = make([]*mvccpb.KeyValue, len(rr.KVs))
51+
for i := range rr.KVs {
52+
resp.PrevKvs[i] = &rr.KVs[i]
53+
}
54+
}
55+
}
56+
57+
resp.Deleted, resp.Header.Revision = txnWrite.DeleteRange(dr.Key, end)
58+
return resp, nil
59+
}
60+
61+
// mkGteRange determines if the range end is a >= range. This works around grpc
62+
// sending empty byte strings as nil; >= is encoded in the range end as '\0'.
63+
// If it is a GTE range, then []byte{} is returned to indicate the empty byte
64+
// string (vs nil being no byte string).
65+
func mkGteRange(rangeEnd []byte) []byte {
66+
if len(rangeEnd) == 1 && rangeEnd[0] == 0 {
67+
return []byte{}
68+
}
69+
return rangeEnd
70+
}

server/etcdserver/txn/put.go

+114
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
// Copyright 2025 The etcd Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package txn
16+
17+
import (
18+
"context"
19+
20+
"go.uber.org/zap"
21+
22+
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
23+
"go.etcd.io/etcd/pkg/v3/traceutil"
24+
"go.etcd.io/etcd/server/v3/etcdserver/errors"
25+
"go.etcd.io/etcd/server/v3/lease"
26+
"go.etcd.io/etcd/server/v3/storage/mvcc"
27+
)
28+
29+
func Put(ctx context.Context, lg *zap.Logger, lessor lease.Lessor, kv mvcc.KV, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
30+
ctx, trace = ensureTrace(ctx, lg, "put",
31+
traceutil.Field{Key: "key", Value: string(p.Key)},
32+
traceutil.Field{Key: "req_size", Value: p.Size()},
33+
)
34+
err = checkLease(lessor, p)
35+
if err != nil {
36+
return nil, trace, err
37+
}
38+
txnWrite := kv.Write(trace)
39+
defer txnWrite.End()
40+
prevKV, err := checkAndGetPrevKV(trace, txnWrite, p)
41+
if err != nil {
42+
return nil, trace, err
43+
}
44+
return put(ctx, txnWrite, p, prevKV), trace, nil
45+
}
46+
47+
func put(ctx context.Context, txnWrite mvcc.TxnWrite, p *pb.PutRequest, prevKV *mvcc.RangeResult) *pb.PutResponse {
48+
trace := traceutil.Get(ctx)
49+
resp := &pb.PutResponse{}
50+
resp.Header = &pb.ResponseHeader{}
51+
val, leaseID := p.Value, lease.LeaseID(p.Lease)
52+
53+
if p.IgnoreValue {
54+
val = prevKV.KVs[0].Value
55+
}
56+
if p.IgnoreLease {
57+
leaseID = lease.LeaseID(prevKV.KVs[0].Lease)
58+
}
59+
if p.PrevKv {
60+
if prevKV != nil && len(prevKV.KVs) != 0 {
61+
resp.PrevKv = &prevKV.KVs[0]
62+
}
63+
}
64+
65+
resp.Header.Revision = txnWrite.Put(p.Key, val, leaseID)
66+
trace.AddField(traceutil.Field{Key: "response_revision", Value: resp.Header.Revision})
67+
return resp
68+
}
69+
70+
func checkPut(trace *traceutil.Trace, txnWrite mvcc.ReadView, lessor lease.Lessor, p *pb.PutRequest) error {
71+
err := checkLease(lessor, p)
72+
if err != nil {
73+
return err
74+
}
75+
_, err = checkAndGetPrevKV(trace, txnWrite, p)
76+
return err
77+
}
78+
79+
func checkLease(lessor lease.Lessor, p *pb.PutRequest) error {
80+
leaseID := lease.LeaseID(p.Lease)
81+
if leaseID != lease.NoLease {
82+
if l := lessor.Lookup(leaseID); l == nil {
83+
return lease.ErrLeaseNotFound
84+
}
85+
}
86+
return nil
87+
}
88+
89+
func checkAndGetPrevKV(trace *traceutil.Trace, txnWrite mvcc.ReadView, p *pb.PutRequest) (prevKV *mvcc.RangeResult, err error) {
90+
prevKV, err = getPrevKV(trace, txnWrite, p)
91+
if err != nil {
92+
return nil, err
93+
}
94+
if p.IgnoreValue || p.IgnoreLease {
95+
if prevKV == nil || len(prevKV.KVs) == 0 {
96+
// ignore_{lease,value} flag expects previous key-value pair
97+
return nil, errors.ErrKeyNotFound
98+
}
99+
}
100+
return prevKV, nil
101+
}
102+
103+
func getPrevKV(trace *traceutil.Trace, txnWrite mvcc.ReadView, p *pb.PutRequest) (prevKV *mvcc.RangeResult, err error) {
104+
if p.IgnoreValue || p.IgnoreLease || p.PrevKv {
105+
trace.StepWithFunction(func() {
106+
prevKV, err = txnWrite.Range(context.TODO(), p.Key, nil, mvcc.RangeOptions{})
107+
}, "get previous kv pair")
108+
109+
if err != nil {
110+
return nil, err
111+
}
112+
}
113+
return prevKV, nil
114+
}

server/etcdserver/txn/range.go

+203
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
// Copyright 2025 The etcd Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package txn
16+
17+
import (
18+
"bytes"
19+
"context"
20+
"sort"
21+
"time"
22+
23+
"go.uber.org/zap"
24+
25+
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
26+
"go.etcd.io/etcd/api/v3/mvccpb"
27+
"go.etcd.io/etcd/pkg/v3/traceutil"
28+
"go.etcd.io/etcd/server/v3/storage/mvcc"
29+
)
30+
31+
func Range(ctx context.Context, lg *zap.Logger, kv mvcc.KV, r *pb.RangeRequest) (resp *pb.RangeResponse, trace *traceutil.Trace, err error) {
32+
ctx, trace = ensureTrace(ctx, lg, "range")
33+
defer func(start time.Time) {
34+
success := err == nil
35+
RangeSecObserve(success, time.Since(start))
36+
}(time.Now())
37+
txnRead := kv.Read(mvcc.ConcurrentReadTxMode, trace)
38+
defer txnRead.End()
39+
resp, err = executeRange(ctx, lg, txnRead, r)
40+
return resp, trace, err
41+
}
42+
43+
func executeRange(ctx context.Context, lg *zap.Logger, txnRead mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
44+
trace := traceutil.Get(ctx)
45+
46+
resp := &pb.RangeResponse{}
47+
resp.Header = &pb.ResponseHeader{}
48+
49+
limit := r.Limit
50+
if r.SortOrder != pb.RangeRequest_NONE ||
51+
r.MinModRevision != 0 || r.MaxModRevision != 0 ||
52+
r.MinCreateRevision != 0 || r.MaxCreateRevision != 0 {
53+
// fetch everything; sort and truncate afterwards
54+
limit = 0
55+
}
56+
if limit > 0 {
57+
// fetch one extra for 'more' flag
58+
limit = limit + 1
59+
}
60+
61+
ro := mvcc.RangeOptions{
62+
Limit: limit,
63+
Rev: r.Revision,
64+
Count: r.CountOnly,
65+
}
66+
67+
rr, err := txnRead.Range(ctx, r.Key, mkGteRange(r.RangeEnd), ro)
68+
if err != nil {
69+
return nil, err
70+
}
71+
72+
if r.MaxModRevision != 0 {
73+
f := func(kv *mvccpb.KeyValue) bool { return kv.ModRevision > r.MaxModRevision }
74+
pruneKVs(rr, f)
75+
}
76+
if r.MinModRevision != 0 {
77+
f := func(kv *mvccpb.KeyValue) bool { return kv.ModRevision < r.MinModRevision }
78+
pruneKVs(rr, f)
79+
}
80+
if r.MaxCreateRevision != 0 {
81+
f := func(kv *mvccpb.KeyValue) bool { return kv.CreateRevision > r.MaxCreateRevision }
82+
pruneKVs(rr, f)
83+
}
84+
if r.MinCreateRevision != 0 {
85+
f := func(kv *mvccpb.KeyValue) bool { return kv.CreateRevision < r.MinCreateRevision }
86+
pruneKVs(rr, f)
87+
}
88+
89+
sortOrder := r.SortOrder
90+
if r.SortTarget != pb.RangeRequest_KEY && sortOrder == pb.RangeRequest_NONE {
91+
// Since current mvcc.Range implementation returns results
92+
// sorted by keys in lexiographically ascending order,
93+
// sort ASCEND by default only when target is not 'KEY'
94+
sortOrder = pb.RangeRequest_ASCEND
95+
} else if r.SortTarget == pb.RangeRequest_KEY && sortOrder == pb.RangeRequest_ASCEND {
96+
// Since current mvcc.Range implementation returns results
97+
// sorted by keys in lexiographically ascending order,
98+
// don't re-sort when target is 'KEY' and order is ASCEND
99+
sortOrder = pb.RangeRequest_NONE
100+
}
101+
if sortOrder != pb.RangeRequest_NONE {
102+
var sorter sort.Interface
103+
switch {
104+
case r.SortTarget == pb.RangeRequest_KEY:
105+
sorter = &kvSortByKey{&kvSort{rr.KVs}}
106+
case r.SortTarget == pb.RangeRequest_VERSION:
107+
sorter = &kvSortByVersion{&kvSort{rr.KVs}}
108+
case r.SortTarget == pb.RangeRequest_CREATE:
109+
sorter = &kvSortByCreate{&kvSort{rr.KVs}}
110+
case r.SortTarget == pb.RangeRequest_MOD:
111+
sorter = &kvSortByMod{&kvSort{rr.KVs}}
112+
case r.SortTarget == pb.RangeRequest_VALUE:
113+
sorter = &kvSortByValue{&kvSort{rr.KVs}}
114+
default:
115+
lg.Panic("unexpected sort target", zap.Int32("sort-target", int32(r.SortTarget)))
116+
}
117+
switch {
118+
case sortOrder == pb.RangeRequest_ASCEND:
119+
sort.Sort(sorter)
120+
case sortOrder == pb.RangeRequest_DESCEND:
121+
sort.Sort(sort.Reverse(sorter))
122+
}
123+
}
124+
125+
if r.Limit > 0 && len(rr.KVs) > int(r.Limit) {
126+
rr.KVs = rr.KVs[:r.Limit]
127+
resp.More = true
128+
}
129+
trace.Step("filter and sort the key-value pairs")
130+
resp.Header.Revision = rr.Rev
131+
resp.Count = int64(rr.Count)
132+
resp.Kvs = make([]*mvccpb.KeyValue, len(rr.KVs))
133+
for i := range rr.KVs {
134+
if r.KeysOnly {
135+
rr.KVs[i].Value = nil
136+
}
137+
resp.Kvs[i] = &rr.KVs[i]
138+
}
139+
trace.Step("assemble the response")
140+
return resp, nil
141+
}
142+
143+
func checkRange(rv mvcc.ReadView, req *pb.RangeRequest) error {
144+
switch {
145+
case req.Revision == 0:
146+
return nil
147+
case req.Revision > rv.Rev():
148+
return mvcc.ErrFutureRev
149+
case req.Revision < rv.FirstRev():
150+
return mvcc.ErrCompacted
151+
}
152+
return nil
153+
}
154+
155+
func pruneKVs(rr *mvcc.RangeResult, isPrunable func(*mvccpb.KeyValue) bool) {
156+
j := 0
157+
for i := range rr.KVs {
158+
rr.KVs[j] = rr.KVs[i]
159+
if !isPrunable(&rr.KVs[i]) {
160+
j++
161+
}
162+
}
163+
rr.KVs = rr.KVs[:j]
164+
}
165+
166+
type kvSort struct{ kvs []mvccpb.KeyValue }
167+
168+
func (s *kvSort) Swap(i, j int) {
169+
t := s.kvs[i]
170+
s.kvs[i] = s.kvs[j]
171+
s.kvs[j] = t
172+
}
173+
func (s *kvSort) Len() int { return len(s.kvs) }
174+
175+
type kvSortByKey struct{ *kvSort }
176+
177+
func (s *kvSortByKey) Less(i, j int) bool {
178+
return bytes.Compare(s.kvs[i].Key, s.kvs[j].Key) < 0
179+
}
180+
181+
type kvSortByVersion struct{ *kvSort }
182+
183+
func (s *kvSortByVersion) Less(i, j int) bool {
184+
return (s.kvs[i].Version - s.kvs[j].Version) < 0
185+
}
186+
187+
type kvSortByCreate struct{ *kvSort }
188+
189+
func (s *kvSortByCreate) Less(i, j int) bool {
190+
return (s.kvs[i].CreateRevision - s.kvs[j].CreateRevision) < 0
191+
}
192+
193+
type kvSortByMod struct{ *kvSort }
194+
195+
func (s *kvSortByMod) Less(i, j int) bool {
196+
return (s.kvs[i].ModRevision - s.kvs[j].ModRevision) < 0
197+
}
198+
199+
type kvSortByValue struct{ *kvSort }
200+
201+
func (s *kvSortByValue) Less(i, j int) bool {
202+
return bytes.Compare(s.kvs[i].Value, s.kvs[j].Value) < 0
203+
}

0 commit comments

Comments
 (0)