Skip to content

Draft implementation of streaming range request #19766

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Documentation/dev-guide/apispec/swagger/rpc.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -2706,6 +2706,16 @@
}
}
},
"etcdserverpbRangeStreamResponse": {
"type": "object",
"properties": {
"range_response": {
"$ref": "#/definitions/etcdserverpbRangeResponse",
"description": "range_response is a partial response for the KV.RangeStream RPC.\nThe result of proto.Merge() applied on all responses should result in the\nsame RangeResponse response as to the Range() method."
}
},
"description": "RangeStreamResponse is the response for the RangeStream RPC.\nThis message is just a wrapper around RangeResponse but there may be a need\nin the future to add streaming specific fields (for progress status, error\npropagation, etc.)."
},
"etcdserverpbRequestOp": {
"type": "object",
"properties": {
Expand Down
1,061 changes: 664 additions & 397 deletions api/etcdserverpb/rpc.pb.go

Large diffs are not rendered by default.

16 changes: 16 additions & 0 deletions api/etcdserverpb/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ service KV {
};
}

// Range gets the keys in the range from the key-value store.
rpc RangeStream(RangeRequest) returns (stream RangeStreamResponse) {
}

// Put puts the given key into the key-value store.
// A put request increments the revision of the key-value store
// and generates one event in the event history.
Expand Down Expand Up @@ -498,6 +502,18 @@ message RangeRequest {
int64 max_create_revision = 13 [(versionpb.etcd_version_field)="3.1"];
}

// RangeStreamResponse is the response for the RangeStream RPC.
// This message is just a wrapper around RangeResponse but there may be a need
// in the future to add streaming specific fields (for progress status, error
// propagation, etc.).
message RangeStreamResponse {
option (versionpb.etcd_version_msg) = "3.7";
// range_response is a partial response for the KV.RangeStream RPC.
// The result of proto.Merge() applied on all responses should result in the
// same RangeResponse response as to the Range() method.
RangeResponse range_response = 1;
}

message RangeResponse {
option (versionpb.etcd_version_msg) = "3.0";

Expand Down
5 changes: 5 additions & 0 deletions client/v3/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ func (rkv *retryKVClient) Range(ctx context.Context, in *pb.RangeRequest, opts .
return rkv.kc.Range(ctx, in, append(opts, withRepeatablePolicy())...)
}

func (rkv *retryKVClient) RangeStream(ctx context.Context, in *pb.RangeRequest, opts ...grpc.CallOption) (resp pb.KV_RangeStreamClient, err error) {
panic("to be implented")
return nil, nil
}

func (rkv *retryKVClient) Put(ctx context.Context, in *pb.PutRequest, opts ...grpc.CallOption) (resp *pb.PutResponse, err error) {
return rkv.kc.Put(ctx, in, opts...)
}
Expand Down
12 changes: 12 additions & 0 deletions server/etcdserver/api/v3rpc/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,18 @@ func (s *kvServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResp
return resp, nil
}

func (s *kvServer) RangeStream(r *pb.RangeRequest, rs pb.KV_RangeStreamServer) error {
if err := checkRangeRequest(r); err != nil {
return err
}
//TODO: Fill header
err := s.kv.RangeStream(r, rs)
if err != nil {
return togRPCError(err)
}
return nil
}

func (s *kvServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
if err := checkPutRequest(r); err != nil {
return nil, err
Expand Down
6 changes: 3 additions & 3 deletions server/etcdserver/txn/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func Range(ctx context.Context, lg *zap.Logger, kv mvcc.KV, r *pb.RangeRequest)
}

func executeRange(ctx context.Context, lg *zap.Logger, txnRead mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
trace := traceutil.Get(ctx)
// trace := traceutil.Get(ctx)

resp := &pb.RangeResponse{}
resp.Header = &pb.ResponseHeader{}
Expand Down Expand Up @@ -235,7 +235,7 @@ func executeRange(ctx context.Context, lg *zap.Logger, txnRead mvcc.TxnRead, r *
rr.KVs = rr.KVs[:r.Limit]
resp.More = true
}
trace.Step("filter and sort the key-value pairs")
// trace.Step("filter and sort the key-value pairs")
resp.Header.Revision = rr.Rev
resp.Count = int64(rr.Count)
resp.Kvs = make([]*mvccpb.KeyValue, len(rr.KVs))
Expand All @@ -245,7 +245,7 @@ func executeRange(ctx context.Context, lg *zap.Logger, txnRead mvcc.TxnRead, r *
}
resp.Kvs[i] = &rr.KVs[i]
}
trace.Step("assemble the response")
// trace.Step("assemble the response")
return resp, nil
}

Expand Down
73 changes: 73 additions & 0 deletions server/etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ const (

type RaftKV interface {
Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error)
RangeStream(r *pb.RangeRequest, rs pb.KV_RangeStreamServer) error
Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error)
DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error)
Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error)
Expand Down Expand Up @@ -141,6 +142,78 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe
return resp, err
}

func (s *EtcdServer) RangeStream(r *pb.RangeRequest, rs pb.KV_RangeStreamServer) error {
trace := traceutil.New("rangeStream",
s.Logger(),
traceutil.Field{Key: "range_begin", Value: string(r.Key)},
traceutil.Field{Key: "range_end", Value: string(r.RangeEnd)},
)
ctx := context.WithValue(rs.Context(), traceutil.TraceKey{}, trace)

var resp *pb.RangeResponse
var err error
count := 0
defer func(start time.Time) {
txn.WarnOfExpensiveReadOnlyRangeRequest(s.Logger(), s.Cfg.WarningApplyDuration, start, r, resp, err)
if resp != nil {
trace.AddField(
traceutil.Field{Key: "response_count", Value: count},
traceutil.Field{Key: "response_revision", Value: resp.Header.Revision},
)
}
trace.LogIfLong(traceThreshold)
}(time.Now())

if !r.Serializable {
err = s.linearizableReadNotify(ctx)
trace.Step("agreement among raft nodes before linearized reading")
if err != nil {
return err
}
}
// TODO: Handle auth
// TODO: Handle limit
r.Limit = 1
resp, _, err = txn.Range(ctx, s.Logger(), s.KV(), r)
if err != nil {
return err
}
count += len(resp.Kvs)
err = rs.Send(&pb.RangeStreamResponse{
RangeResponse: resp,
})
if err != nil {
return err
}
r.Revision = resp.Header.Revision
for len(resp.Kvs) != 0 {
// TODO: Forbit non standard order/Implement order by just loading keys
r.Key = append(resp.Kvs[len(resp.Kvs)-1].Key, '\x00')
if resp.Size() < 1024*1024 {
r.Limit *= 2
}
if resp.Size() > 4*1024*1024 {
r.Limit /= 2
}
if r.Limit == 0 {
r.Limit = 1
}

resp, _, err = txn.Range(ctx, s.Logger(), s.KV(), r)
if err != nil {
return err
}
count += len(resp.Kvs)
err = rs.Send(&pb.RangeStreamResponse{
RangeResponse: resp,
})
if err != nil {
return err
}
}
return nil
}

func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
ctx = context.WithValue(ctx, traceutil.StartTimeKey{}, time.Now())
resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Put: r})
Expand Down
5 changes: 5 additions & 0 deletions server/proxy/grpcproxy/adapter/kv_client_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,8 @@ func (s *kvs2kvc) Txn(ctx context.Context, in *pb.TxnRequest, opts ...grpc.CallO
func (s *kvs2kvc) Compact(ctx context.Context, in *pb.CompactionRequest, opts ...grpc.CallOption) (*pb.CompactionResponse, error) {
return s.kvs.Compact(ctx, in)
}

func (s *kvs2kvc) RangeStream(ctx context.Context, in *pb.RangeRequest, opts ...grpc.CallOption) (pb.KV_RangeStreamClient, error) {
panic("to be implemented")
return nil, nil
}
5 changes: 5 additions & 0 deletions server/proxy/grpcproxy/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ func (p *kvProxy) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRespo
return gresp, nil
}

func (p *kvProxy) RangeStream(r *pb.RangeRequest, rs pb.KV_RangeStreamServer) error {
panic("to be implemented")
return nil
}

func (p *kvProxy) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
p.cache.Invalidate(r.Key, nil)
cacheKeys.Set(float64(p.cache.Size()))
Expand Down
6 changes: 3 additions & 3 deletions server/storage/mvcc/kvstore_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,11 @@ func (tr *storeTxnCommon) rangeKeys(ctx context.Context, key, end []byte, curRev
}
if ro.Count {
total := tr.s.kvindex.CountRevisions(key, end, rev)
tr.trace.Step("count revisions from in-memory index tree")
// tr.trace.Step("count revisions from in-memory index tree")
return &RangeResult{KVs: nil, Count: total, Rev: curRev}, nil
}
revpairs, total := tr.s.kvindex.Revisions(key, end, rev, int(ro.Limit))
tr.trace.Step("range keys from in-memory index tree")
// tr.trace.Step("range keys from in-memory index tree")
if len(revpairs) == 0 {
return &RangeResult{KVs: nil, Count: total, Rev: curRev}, nil
}
Expand Down Expand Up @@ -127,7 +127,7 @@ func (tr *storeTxnCommon) rangeKeys(ctx context.Context, key, end []byte, curRev
)
}
}
tr.trace.Step("range keys from bolt db")
// tr.trace.Step("range keys from bolt db")
return &RangeResult{KVs: kvs, Count: total, Rev: curRev}, nil
}

Expand Down
19 changes: 11 additions & 8 deletions tools/benchmark/cmd/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package cmd

import (
"context"
"encoding/binary"
"fmt"
"math"
"math/rand"
Expand Down Expand Up @@ -83,7 +82,6 @@ func putFunc(cmd *cobra.Command, _ []string) {
}
limit := rate.NewLimiter(rate.Limit(putRate), 1)
clients := mustCreateClients(totalClients, totalConns)
k, v := make([]byte, keySize), string(mustRandBytes(valSize))

bar = pb.New(putTotal)
bar.Start()
Expand All @@ -106,12 +104,7 @@ func putFunc(cmd *cobra.Command, _ []string) {

go func() {
for i := 0; i < putTotal; i++ {
if seqKeys {
binary.PutVarint(k, int64(i%keySpaceSize))
} else {
binary.PutVarint(k, int64(rand.Intn(keySpaceSize)))
}
requests <- v3.OpPut(string(k), v)
requests <- v3.OpPut(RandString(uint(keySize)), RandString(uint(valSize)))
}
close(requests)
}()
Expand Down Expand Up @@ -181,3 +174,13 @@ func hashKV(cmd *cobra.Command, clients []*v3.Client) {
rs += fmt.Sprintf("\tDB size: %s", humanize.Bytes(uint64(rt.DbSize)))
fmt.Println(rs)
}

const chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"

func RandString(l uint) string {
s := make([]byte, l)
for i := 0; i < int(l); i++ {
s[i] = chars[rand.Intn(len(chars))]
}
return string(s)
}
55 changes: 42 additions & 13 deletions tools/benchmark/cmd/range.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,19 @@ package cmd

import (
"context"
"errors"
"fmt"
"io"
"math"
"os"
"time"

"github.com/cheggaaa/pb/v3"
"github.com/spf13/cobra"
"golang.org/x/time/rate"
"google.golang.org/grpc"

etcdserverpb "go.etcd.io/etcd/api/v3/etcdserverpb"
v3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/pkg/v3/report"
)
Expand All @@ -43,6 +47,7 @@ var (
rangeConsistency string
rangeLimit int64
rangeCountOnly bool
rangeStream bool
)

func init() {
Expand All @@ -52,6 +57,7 @@ func init() {
rangeCmd.Flags().StringVar(&rangeConsistency, "consistency", "l", "Linearizable(l) or Serializable(s)")
rangeCmd.Flags().Int64Var(&rangeLimit, "limit", 0, "Maximum number of results to return from range request (0 is no limit)")
rangeCmd.Flags().BoolVar(&rangeCountOnly, "count-only", false, "Only returns the count of keys")
rangeCmd.Flags().BoolVar(&rangeStream, "stream", false, "Stream")
}

func rangeFunc(cmd *cobra.Command, args []string) {
Expand Down Expand Up @@ -80,22 +86,53 @@ func rangeFunc(cmd *cobra.Command, args []string) {
}
limit := rate.NewLimiter(rate.Limit(rangeRate), 1)

requests := make(chan v3.Op, totalClients)
requests := make(chan struct{}, totalClients)
clients := mustCreateClients(totalClients, totalConns)

bar = pb.New(rangeTotal)
bar.Start()

r := newReport()
request := &etcdserverpb.RangeRequest{
Key: []byte(k),
RangeEnd: []byte(end),
Limit: rangeLimit,
CountOnly: rangeCountOnly,
}
if rangeConsistency == "s" {
request.Serializable = true
}
callOpts := []grpc.CallOption{
grpc.WaitForReady(true),
grpc.MaxCallSendMsgSize(2 * 1024 * 1024),
grpc.MaxCallRecvMsgSize(math.MaxInt32),
}
for i := range clients {
wg.Add(1)
go func(c *v3.Client) {
defer wg.Done()
for op := range requests {
kv := etcdserverpb.NewKVClient(c.ActiveConnection())
for range requests {
limit.Wait(context.Background())

st := time.Now()
_, err := c.Do(context.Background(), op)
var err error
if rangeStream {
var client etcdserverpb.KV_RangeStreamClient
client, err = kv.RangeStream(context.Background(), request, callOpts...)
if err == nil {
for {
_, err := client.Recv()
if err != nil {
if !errors.Is(err, io.EOF) {
panic(err)
}
break
}
}
}
} else {
_, err = kv.Range(context.Background(), request, callOpts...)
}
r.Results() <- report.Result{Err: err, Start: st, End: time.Now()}
bar.Increment()
}
Expand All @@ -104,15 +141,7 @@ func rangeFunc(cmd *cobra.Command, args []string) {

go func() {
for i := 0; i < rangeTotal; i++ {
opts := []v3.OpOption{v3.WithRange(end), v3.WithLimit(rangeLimit)}
if rangeCountOnly {
opts = append(opts, v3.WithCountOnly())
}
if rangeConsistency == "s" {
opts = append(opts, v3.WithSerializable())
}
op := v3.OpGet(k, opts...)
requests <- op
requests <- struct{}{}
}
close(requests)
}()
Expand Down