Skip to content

Commit 5b8f7c9

Browse files
committed
Split executeRange function
Signed-off-by: Marek Siarkowicz <[email protected]>
1 parent 58086da commit 5b8f7c9

File tree

1 file changed

+62
-38
lines changed

1 file changed

+62
-38
lines changed

server/etcdserver/txn/txn.go

+62-38
Original file line numberDiff line numberDiff line change
@@ -152,32 +152,45 @@ func Range(ctx context.Context, lg *zap.Logger, kv mvcc.KV, r *pb.RangeRequest)
152152
func executeRange(ctx context.Context, lg *zap.Logger, txnRead mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
153153
trace := traceutil.Get(ctx)
154154

155-
resp := &pb.RangeResponse{}
156-
resp.Header = &pb.ResponseHeader{}
155+
limit := rangeLimit(r)
156+
ro := mvcc.RangeOptions{
157+
Limit: limit,
158+
Rev: r.Revision,
159+
Count: r.CountOnly,
160+
}
161+
162+
rr, err := txnRead.Range(ctx, r.Key, mkGteRange(r.RangeEnd), ro)
163+
if err != nil {
164+
return nil, err
165+
}
166+
167+
filterRangeResults(rr, r)
168+
sortRangeResults(rr, r, lg)
169+
trace.Step("filter and sort the key-value pairs")
170+
171+
resp := asembleRangeResponse(rr, r)
172+
trace.Step("assemble the response")
173+
174+
return resp, nil
175+
}
157176

177+
func rangeLimit(r *pb.RangeRequest) int64 {
158178
limit := r.Limit
159179
if r.SortOrder != pb.RangeRequest_NONE ||
160180
r.MinModRevision != 0 || r.MaxModRevision != 0 ||
161181
r.MinCreateRevision != 0 || r.MaxCreateRevision != 0 {
162182
// fetch everything; sort and truncate afterwards
163183
limit = 0
164184
}
185+
165186
if limit > 0 {
166187
// fetch one extra for 'more' flag
167188
limit = limit + 1
168189
}
190+
return limit
191+
}
169192

170-
ro := mvcc.RangeOptions{
171-
Limit: limit,
172-
Rev: r.Revision,
173-
Count: r.CountOnly,
174-
}
175-
176-
rr, err := txnRead.Range(ctx, r.Key, mkGteRange(r.RangeEnd), ro)
177-
if err != nil {
178-
return nil, err
179-
}
180-
193+
func filterRangeResults(rr *mvcc.RangeResult, r *pb.RangeRequest) {
181194
if r.MaxModRevision != 0 {
182195
f := func(kv *mvccpb.KeyValue) bool { return kv.ModRevision > r.MaxModRevision }
183196
pruneKVs(rr, f)
@@ -194,7 +207,9 @@ func executeRange(ctx context.Context, lg *zap.Logger, txnRead mvcc.TxnRead, r *
194207
f := func(kv *mvccpb.KeyValue) bool { return kv.CreateRevision < r.MinCreateRevision }
195208
pruneKVs(rr, f)
196209
}
210+
}
197211

212+
func sortRangeResults(rr *mvcc.RangeResult, r *pb.RangeRequest, lg *zap.Logger) {
198213
sortOrder := r.SortOrder
199214
if r.SortTarget != pb.RangeRequest_KEY && sortOrder == pb.RangeRequest_NONE {
200215
// Since current mvcc.Range implementation returns results
@@ -207,46 +222,55 @@ func executeRange(ctx context.Context, lg *zap.Logger, txnRead mvcc.TxnRead, r *
207222
// don't re-sort when target is 'KEY' and order is ASCEND
208223
sortOrder = pb.RangeRequest_NONE
209224
}
210-
if sortOrder != pb.RangeRequest_NONE {
211-
var sorter sort.Interface
212-
switch {
213-
case r.SortTarget == pb.RangeRequest_KEY:
214-
sorter = &kvSortByKey{&kvSort{rr.KVs}}
215-
case r.SortTarget == pb.RangeRequest_VERSION:
216-
sorter = &kvSortByVersion{&kvSort{rr.KVs}}
217-
case r.SortTarget == pb.RangeRequest_CREATE:
218-
sorter = &kvSortByCreate{&kvSort{rr.KVs}}
219-
case r.SortTarget == pb.RangeRequest_MOD:
220-
sorter = &kvSortByMod{&kvSort{rr.KVs}}
221-
case r.SortTarget == pb.RangeRequest_VALUE:
222-
sorter = &kvSortByValue{&kvSort{rr.KVs}}
223-
default:
224-
lg.Panic("unexpected sort target", zap.Int32("sort-target", int32(r.SortTarget)))
225-
}
226-
switch {
227-
case sortOrder == pb.RangeRequest_ASCEND:
228-
sort.Sort(sorter)
229-
case sortOrder == pb.RangeRequest_DESCEND:
230-
sort.Sort(sort.Reverse(sorter))
231-
}
225+
226+
if sortOrder == pb.RangeRequest_NONE {
227+
return
232228
}
233229

230+
var sorter sort.Interface
231+
switch r.SortTarget {
232+
case pb.RangeRequest_KEY:
233+
sorter = &kvSortByKey{&kvSort{rr.KVs}}
234+
case pb.RangeRequest_VERSION:
235+
sorter = &kvSortByVersion{&kvSort{rr.KVs}}
236+
case pb.RangeRequest_CREATE:
237+
sorter = &kvSortByCreate{&kvSort{rr.KVs}}
238+
case pb.RangeRequest_MOD:
239+
sorter = &kvSortByMod{&kvSort{rr.KVs}}
240+
case pb.RangeRequest_VALUE:
241+
sorter = &kvSortByValue{&kvSort{rr.KVs}}
242+
default:
243+
// This should ideally not happen if request validation is done prior.
244+
lg.Panic("unexpected sort target", zap.Int32("sort-target", int32(r.SortTarget)))
245+
return // Defensive return after panic, though panic will stop execution.
246+
}
247+
248+
switch sortOrder {
249+
case pb.RangeRequest_ASCEND:
250+
sort.Sort(sorter)
251+
case pb.RangeRequest_DESCEND:
252+
sort.Sort(sort.Reverse(sorter))
253+
}
254+
}
255+
256+
func asembleRangeResponse(rr *mvcc.RangeResult, r *pb.RangeRequest) *pb.RangeResponse {
257+
resp := &pb.RangeResponse{Header: &pb.ResponseHeader{}}
234258
if r.Limit > 0 && len(rr.KVs) > int(r.Limit) {
235259
rr.KVs = rr.KVs[:r.Limit]
236260
resp.More = true
237261
}
238-
trace.Step("filter and sort the key-value pairs")
262+
239263
resp.Header.Revision = rr.Rev
240264
resp.Count = int64(rr.Count)
265+
241266
resp.Kvs = make([]*mvccpb.KeyValue, len(rr.KVs))
242267
for i := range rr.KVs {
243268
if r.KeysOnly {
244269
rr.KVs[i].Value = nil
245270
}
246271
resp.Kvs[i] = &rr.KVs[i]
247272
}
248-
trace.Step("assemble the response")
249-
return resp, nil
273+
return resp
250274
}
251275

252276
func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWithSharedBuffer bool, kv mvcc.KV, lessor lease.Lessor) (*pb.TxnResponse, *traceutil.Trace, error) {

0 commit comments

Comments
 (0)