Skip to content

Commit 11c2342

Browse files
authored
CLI implementation for get sequence updates (#690)
(stacked on top of #689 )
1 parent c7a52d7 commit 11c2342

File tree

11 files changed

+331
-0
lines changed

11 files changed

+331
-0
lines changed

cmd/client/cmd.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ package client
1717
import (
1818
"fmt"
1919

20+
"github.com/streamnative/oxia/cmd/client/sequenceupdates"
21+
2022
"github.com/spf13/cobra"
2123

2224
"github.com/streamnative/oxia/cmd/client/common"
@@ -52,4 +54,5 @@ func init() {
5254
Cmd.AddCommand(rangescan.Cmd)
5355
Cmd.AddCommand(deleterange.Cmd)
5456
Cmd.AddCommand(notifications.Cmd)
57+
Cmd.AddCommand(sequenceupdates.Cmd)
5558
}

cmd/client/common/client_mock.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,3 +81,7 @@ func (m *MockClient) RangeScan(_ context.Context, minKeyInclusive string, maxKey
8181
func (*MockClient) GetNotifications() (oxia.Notifications, error) {
8282
return nil, errors.New("not implemented in mock")
8383
}
84+
85+
func (*MockClient) GetSequenceUpdates(context.Context, string, ...oxia.GetSequenceUpdatesOption) (<-chan string, error) {
86+
return nil, errors.New("not implemented in mock")
87+
}

cmd/client/sequenceupdates/cmd.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
// Copyright 2023 StreamNative, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package sequenceupdates
16+
17+
import (
18+
"context"
19+
"log/slog"
20+
21+
"github.com/spf13/cobra"
22+
23+
"github.com/streamnative/oxia/cmd/client/common"
24+
"github.com/streamnative/oxia/oxia"
25+
)
26+
27+
var Config = flags{}
28+
29+
type flags struct {
30+
key string
31+
partitionKey string
32+
}
33+
34+
func (flags *flags) Reset() {
35+
flags.key = ""
36+
flags.partitionKey = ""
37+
}
38+
39+
func init() {
40+
Cmd.Flags().StringVarP(&Config.partitionKey, "partition-key", "p", "", "Partition Key to be used in override the shard routing")
41+
_ = Cmd.MarkFlagRequired("partition-key")
42+
}
43+
44+
var Cmd = &cobra.Command{
45+
Use: "sequence-updates [flags] KEY --partition-key PARTITION_KEY",
46+
Short: "Get key sequences updates",
47+
Long: `Follow the updates on a sequence of keys`,
48+
Args: cobra.ExactArgs(1),
49+
RunE: exec,
50+
}
51+
52+
func exec(_ *cobra.Command, args []string) error {
53+
client, err := common.Config.NewClient()
54+
if err != nil {
55+
return err
56+
}
57+
58+
defer client.Close()
59+
60+
Config.key = args[0]
61+
updates, err := client.GetSequenceUpdates(context.Background(), Config.key, oxia.PartitionKey(Config.partitionKey))
62+
if err != nil {
63+
return err
64+
}
65+
66+
for key := range updates {
67+
slog.Info("", slog.String("key", key))
68+
}
69+
70+
return nil
71+
}

oxia/async_client_impl.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -458,6 +458,15 @@ func (c *clientImpl) RangeScan(ctx context.Context, minKeyInclusive string, maxK
458458
return outCh
459459
}
460460

461+
func (c *clientImpl) GetSequenceUpdates(ctx context.Context, prefixKey string, options ...GetSequenceUpdatesOption) (<-chan string, error) {
462+
opts := newGetSequenceUpdatesOptions(options)
463+
if opts.partitionKey == nil {
464+
return nil, errors.Wrap(ErrInvalidOptions, "partitionKey is required")
465+
}
466+
467+
return newSequenceUpdates(ctx, prefixKey, *opts.partitionKey, c.clientPool, c.shardManager), nil
468+
}
469+
461470
// We do range scan on all the shards, and we need to always pick the lowest key
462471
// across all the shards.
463472
func aggregateAndSortRangeScanAcrossShards(channels []chan GetResult, outCh chan GetResult) {

oxia/client.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,12 @@ type AsyncClient interface {
103103
// https://github.com/streamnative/oxia/blob/main/docs/oxia-key-sorting.md
104104
RangeScan(ctx context.Context, minKeyInclusive string, maxKeyExclusive string, options ...RangeScanOption) <-chan GetResult
105105

106+
// GetSequenceUpdates allows to subscribe to the updates happening on a sequential key
107+
// The channel will report the current latest sequence for a given key.
108+
// Multiple updates can be collapsed into one single event with the
109+
// highest sequence.
110+
GetSequenceUpdates(ctx context.Context, prefixKey string, options ...GetSequenceUpdatesOption) (<-chan string, error)
111+
106112
// GetNotifications creates a new subscription to receive the notifications
107113
// from Oxia for any change that is applied to the database
108114
GetNotifications() (Notifications, error)
@@ -164,6 +170,12 @@ type SyncClient interface {
164170
// inserted with that partition key).
165171
RangeScan(ctx context.Context, minKeyInclusive string, maxKeyExclusive string, options ...RangeScanOption) <-chan GetResult
166172

173+
// GetSequenceUpdates allows to subscribe to the updates happening on a sequential key
174+
// The channel will report the current latest sequence for a given key.
175+
// Multiple updates can be collapsed into one single event with the
176+
// highest sequence.
177+
GetSequenceUpdates(ctx context.Context, prefixKey string, options ...GetSequenceUpdatesOption) (<-chan string, error)
178+
167179
// GetNotifications creates a new subscription to receive the notifications
168180
// from Oxia for any change that is applied to the database
169181
GetNotifications() (Notifications, error)

oxia/options_base.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ type BaseOption interface {
2222
DeleteRangeOption
2323
ListOption
2424
RangeScanOption
25+
GetSequenceUpdatesOption
2526
}
2627

2728
type baseOptions struct {
@@ -66,6 +67,10 @@ func (o *partitionKeyOpt) applyRangeScan(opts *rangeScanOptions) {
6667
opts.partitionKey = o.partitionKey
6768
}
6869

70+
func (o *partitionKeyOpt) applyGetSequenceUpdates(opts *getSequenceUpdatesOptions) {
71+
opts.partitionKey = o.partitionKey
72+
}
73+
6974
// PartitionKey overrides the partition routing with the specified `partitionKey` instead
7075
// of the regular record key.
7176
// Records with the same partitionKey will always be guaranteed to be co-located in the

oxia/options_get_sequence_updates.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
// Copyright 2023 StreamNative, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package oxia
16+
17+
type getSequenceUpdatesOptions struct {
18+
baseOptions
19+
}
20+
21+
// GetSequenceUpdatesOption represents an option for the [SyncClient.GetSequenceUpdates] operation.
22+
type GetSequenceUpdatesOption interface {
23+
applyGetSequenceUpdates(opts *getSequenceUpdatesOptions)
24+
}
25+
26+
func newGetSequenceUpdatesOptions(opts []GetSequenceUpdatesOption) *getSequenceUpdatesOptions {
27+
getSequenceUpdatesOptions := &getSequenceUpdatesOptions{}
28+
for _, opt := range opts {
29+
opt.applyGetSequenceUpdates(getSequenceUpdatesOptions)
30+
}
31+
return getSequenceUpdatesOptions
32+
}

oxia/sequence_updates.go

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
// Copyright 2023 StreamNative, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package oxia
16+
17+
import (
18+
"context"
19+
"errors"
20+
"log/slog"
21+
"time"
22+
23+
"github.com/cenkalti/backoff/v4"
24+
25+
"github.com/streamnative/oxia/common"
26+
"github.com/streamnative/oxia/oxia/internal"
27+
"github.com/streamnative/oxia/proto"
28+
)
29+
30+
type sequenceUpdates struct {
31+
prefixKey string
32+
partitionKey string
33+
ch chan string
34+
shardManager internal.ShardManager
35+
clientPool common.ClientPool
36+
37+
ctx context.Context
38+
backoff backoff.BackOff
39+
log *slog.Logger
40+
}
41+
42+
func newSequenceUpdates(ctx context.Context, prefixKey string, partitionKey string,
43+
clientPool common.ClientPool, shardManager internal.ShardManager) <-chan string {
44+
su := &sequenceUpdates{
45+
prefixKey: prefixKey,
46+
partitionKey: partitionKey,
47+
ch: make(chan string),
48+
shardManager: shardManager,
49+
clientPool: clientPool,
50+
ctx: ctx,
51+
backoff: common.NewBackOffWithInitialInterval(ctx, 1*time.Second),
52+
log: slog.With(
53+
slog.String("component", "oxia-get-sequence-updates"),
54+
slog.String("key", "key"),
55+
),
56+
}
57+
58+
go common.DoWithLabels(
59+
su.ctx,
60+
map[string]string{
61+
"oxia": "sequence-updates",
62+
"prefixKey": prefixKey,
63+
},
64+
su.getSequenceUpdatesWithRetries,
65+
)
66+
67+
return su.ch
68+
}
69+
70+
func (su *sequenceUpdates) getSequenceUpdatesWithRetries() { //nolint:revive
71+
_ = backoff.RetryNotify(su.getSequenceUpdates,
72+
su.backoff, func(err error, duration time.Duration) {
73+
if !errors.Is(err, context.Canceled) {
74+
su.log.Error(
75+
"Error while getting sequence updates",
76+
slog.Any("error", err),
77+
slog.Duration("retry-after", duration),
78+
)
79+
}
80+
})
81+
82+
// Signal that the background go-routine is now done
83+
close(su.ch)
84+
}
85+
86+
func (su *sequenceUpdates) getSequenceUpdates() error {
87+
shard := su.shardManager.Get(su.partitionKey)
88+
leader := su.shardManager.Leader(shard)
89+
90+
rpc, err := su.clientPool.GetClientRpc(leader)
91+
if err != nil {
92+
return err
93+
}
94+
95+
updates, err := rpc.GetSequenceUpdates(su.ctx, &proto.GetSequenceUpdatesRequest{
96+
Key: su.prefixKey,
97+
})
98+
if err != nil {
99+
if su.ctx.Err() != nil {
100+
return su.ctx.Err()
101+
}
102+
return err
103+
}
104+
105+
su.backoff.Reset()
106+
107+
for {
108+
res, err2 := updates.Recv()
109+
if err2 != nil {
110+
return err2
111+
}
112+
113+
if res.HighestSequenceKey == "" {
114+
// Ignore first response if there are no sequences for the key
115+
continue
116+
}
117+
118+
select {
119+
case su.ch <- res.HighestSequenceKey:
120+
case <-su.ctx.Done():
121+
return su.ctx.Err()
122+
}
123+
}
124+
}

oxia/sync_client_impl.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,10 @@ func (c *syncClientImpl) RangeScan(ctx context.Context, minKeyInclusive string,
133133
return c.asyncClient.RangeScan(ctx, minKeyInclusive, maxKeyExclusive, options...)
134134
}
135135

136+
func (c *syncClientImpl) GetSequenceUpdates(ctx context.Context, prefixKey string, options ...GetSequenceUpdatesOption) (<-chan string, error) {
137+
return c.asyncClient.GetSequenceUpdates(ctx, prefixKey, options...)
138+
}
139+
136140
func (c *syncClientImpl) GetNotifications() (Notifications, error) {
137141
return c.asyncClient.GetNotifications()
138142
}

0 commit comments

Comments
 (0)