Skip to content

Commit c3147b5

Browse files
committed
Split executeRange function
Signed-off-by: Marek Siarkowicz <[email protected]>
1 parent 58086da commit c3147b5

File tree

1 file changed

+35
-16
lines changed

1 file changed

+35
-16
lines changed

server/etcdserver/txn/txn.go

+35-16
Original file line numberDiff line numberDiff line change
@@ -152,32 +152,45 @@ func Range(ctx context.Context, lg *zap.Logger, kv mvcc.KV, r *pb.RangeRequest)
152152
func executeRange(ctx context.Context, lg *zap.Logger, txnRead mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
153153
trace := traceutil.Get(ctx)
154154

155-
resp := &pb.RangeResponse{}
156-
resp.Header = &pb.ResponseHeader{}
155+
limit := rangeLimit(r)
156+
ro := mvcc.RangeOptions{
157+
Limit: limit,
158+
Rev: r.Revision,
159+
Count: r.CountOnly,
160+
}
161+
162+
rr, err := txnRead.Range(ctx, r.Key, mkGteRange(r.RangeEnd), ro)
163+
if err != nil {
164+
return nil, err
165+
}
166+
167+
filterRangeResults(rr, r)
168+
sortRangeResults(rr, r, lg)
169+
trace.Step("filter and sort the key-value pairs")
170+
171+
resp := asembleRangeResponse(rr, r)
172+
trace.Step("assemble the response")
157173

174+
return resp, nil
175+
}
176+
177+
func rangeLimit(r *pb.RangeRequest) int64 {
158178
limit := r.Limit
159179
if r.SortOrder != pb.RangeRequest_NONE ||
160180
r.MinModRevision != 0 || r.MaxModRevision != 0 ||
161181
r.MinCreateRevision != 0 || r.MaxCreateRevision != 0 {
162182
// fetch everything; sort and truncate afterwards
163183
limit = 0
164184
}
185+
165186
if limit > 0 {
166187
// fetch one extra for 'more' flag
167188
limit = limit + 1
168189
}
190+
return limit
191+
}
169192

170-
ro := mvcc.RangeOptions{
171-
Limit: limit,
172-
Rev: r.Revision,
173-
Count: r.CountOnly,
174-
}
175-
176-
rr, err := txnRead.Range(ctx, r.Key, mkGteRange(r.RangeEnd), ro)
177-
if err != nil {
178-
return nil, err
179-
}
180-
193+
func filterRangeResults(rr *mvcc.RangeResult, r *pb.RangeRequest) {
181194
if r.MaxModRevision != 0 {
182195
f := func(kv *mvccpb.KeyValue) bool { return kv.ModRevision > r.MaxModRevision }
183196
pruneKVs(rr, f)
@@ -194,7 +207,9 @@ func executeRange(ctx context.Context, lg *zap.Logger, txnRead mvcc.TxnRead, r *
194207
f := func(kv *mvccpb.KeyValue) bool { return kv.CreateRevision < r.MinCreateRevision }
195208
pruneKVs(rr, f)
196209
}
210+
}
197211

212+
func sortRangeResults(rr *mvcc.RangeResult, r *pb.RangeRequest, lg *zap.Logger) {
198213
sortOrder := r.SortOrder
199214
if r.SortTarget != pb.RangeRequest_KEY && sortOrder == pb.RangeRequest_NONE {
200215
// Since current mvcc.Range implementation returns results
@@ -207,6 +222,7 @@ func executeRange(ctx context.Context, lg *zap.Logger, txnRead mvcc.TxnRead, r *
207222
// don't re-sort when target is 'KEY' and order is ASCEND
208223
sortOrder = pb.RangeRequest_NONE
209224
}
225+
210226
if sortOrder != pb.RangeRequest_NONE {
211227
var sorter sort.Interface
212228
switch {
@@ -230,23 +246,26 @@ func executeRange(ctx context.Context, lg *zap.Logger, txnRead mvcc.TxnRead, r *
230246
sort.Sort(sort.Reverse(sorter))
231247
}
232248
}
249+
}
233250

251+
func asembleRangeResponse(rr *mvcc.RangeResult, r *pb.RangeRequest) *pb.RangeResponse {
252+
resp := &pb.RangeResponse{Header: &pb.ResponseHeader{}}
234253
if r.Limit > 0 && len(rr.KVs) > int(r.Limit) {
235254
rr.KVs = rr.KVs[:r.Limit]
236255
resp.More = true
237256
}
238-
trace.Step("filter and sort the key-value pairs")
257+
239258
resp.Header.Revision = rr.Rev
240259
resp.Count = int64(rr.Count)
260+
241261
resp.Kvs = make([]*mvccpb.KeyValue, len(rr.KVs))
242262
for i := range rr.KVs {
243263
if r.KeysOnly {
244264
rr.KVs[i].Value = nil
245265
}
246266
resp.Kvs[i] = &rr.KVs[i]
247267
}
248-
trace.Step("assemble the response")
249-
return resp, nil
268+
return resp
250269
}
251270

252271
func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWithSharedBuffer bool, kv mvcc.KV, lessor lease.Lessor) (*pb.TxnResponse, *traceutil.Trace, error) {

0 commit comments

Comments
 (0)