diff --git a/cmd/client/common/client_mock.go b/cmd/client/common/client_mock.go index c05007d6..0ed5b8f5 100644 --- a/cmd/client/common/client_mock.go +++ b/cmd/client/common/client_mock.go @@ -81,3 +81,7 @@ func (m *MockClient) RangeScan(_ context.Context, minKeyInclusive string, maxKey func (*MockClient) GetNotifications() (oxia.Notifications, error) { return nil, errors.New("not implemented in mock") } + +func (*MockClient) GetSequenceUpdates(context.Context, string, ...oxia.GetSequenceUpdatesOption) (<-chan string, error) { + return nil, errors.New("not implemented in mock") +} diff --git a/oxia/async_client_impl.go b/oxia/async_client_impl.go index 4d02bd9a..5f056694 100644 --- a/oxia/async_client_impl.go +++ b/oxia/async_client_impl.go @@ -458,6 +458,15 @@ func (c *clientImpl) RangeScan(ctx context.Context, minKeyInclusive string, maxK return outCh } +func (c *clientImpl) GetSequenceUpdates(ctx context.Context, prefixKey string, options ...GetSequenceUpdatesOption) (<-chan string, error) { + opts := newGetSequenceUpdatesOptions(options) + if opts.partitionKey == nil { + return nil, errors.Wrap(ErrInvalidOptions, "partitionKey is required") + } + + return newSequenceUpdates(ctx, prefixKey, *opts.partitionKey, c.clientPool, c.shardManager), nil +} + // We do range scan on all the shards, and we need to always pick the lowest key // across all the shards. func aggregateAndSortRangeScanAcrossShards(channels []chan GetResult, outCh chan GetResult) { diff --git a/oxia/client.go b/oxia/client.go index 3def80bf..e1224ec4 100644 --- a/oxia/client.go +++ b/oxia/client.go @@ -103,6 +103,12 @@ type AsyncClient interface { // https://github.com/streamnative/oxia/blob/main/docs/oxia-key-sorting.md RangeScan(ctx context.Context, minKeyInclusive string, maxKeyExclusive string, options ...RangeScanOption) <-chan GetResult + // GetSequenceUpdates allows to subscribe to the updates happening on a sequential key + // The channel will report the current latest sequence for a given key. + // Multiple updates can be collapsed into one single event with the + // highest sequence. + GetSequenceUpdates(ctx context.Context, prefixKey string, options ...GetSequenceUpdatesOption) (<-chan string, error) + // GetNotifications creates a new subscription to receive the notifications // from Oxia for any change that is applied to the database GetNotifications() (Notifications, error) @@ -164,6 +170,12 @@ type SyncClient interface { // inserted with that partition key). RangeScan(ctx context.Context, minKeyInclusive string, maxKeyExclusive string, options ...RangeScanOption) <-chan GetResult + // GetSequenceUpdates allows to subscribe to the updates happening on a sequential key + // The channel will report the current latest sequence for a given key. + // Multiple updates can be collapsed into one single event with the + // highest sequence. + GetSequenceUpdates(ctx context.Context, prefixKey string, options ...GetSequenceUpdatesOption) (<-chan string, error) + // GetNotifications creates a new subscription to receive the notifications // from Oxia for any change that is applied to the database GetNotifications() (Notifications, error) diff --git a/oxia/options_base.go b/oxia/options_base.go index 386ee0ea..8efe1420 100644 --- a/oxia/options_base.go +++ b/oxia/options_base.go @@ -22,6 +22,7 @@ type BaseOption interface { DeleteRangeOption ListOption RangeScanOption + GetSequenceUpdatesOption } type baseOptions struct { @@ -66,6 +67,10 @@ func (o *partitionKeyOpt) applyRangeScan(opts *rangeScanOptions) { opts.partitionKey = o.partitionKey } +func (o *partitionKeyOpt) applyGetSequenceUpdates(opts *getSequenceUpdatesOptions) { + opts.partitionKey = o.partitionKey +} + // PartitionKey overrides the partition routing with the specified `partitionKey` instead // of the regular record key. // Records with the same partitionKey will always be guaranteed to be co-located in the diff --git a/oxia/options_get_sequence_updates.go b/oxia/options_get_sequence_updates.go new file mode 100644 index 00000000..4c0128fb --- /dev/null +++ b/oxia/options_get_sequence_updates.go @@ -0,0 +1,32 @@ +// Copyright 2023 StreamNative, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package oxia + +type getSequenceUpdatesOptions struct { + baseOptions +} + +// GetSequenceUpdatesOption represents an option for the [SyncClient.GetSequenceUpdates] operation. +type GetSequenceUpdatesOption interface { + applyGetSequenceUpdates(opts *getSequenceUpdatesOptions) +} + +func newGetSequenceUpdatesOptions(opts []GetSequenceUpdatesOption) *getSequenceUpdatesOptions { + getSequenceUpdatesOptions := &getSequenceUpdatesOptions{} + for _, opt := range opts { + opt.applyGetSequenceUpdates(getSequenceUpdatesOptions) + } + return getSequenceUpdatesOptions +} diff --git a/oxia/sequence_updates.go b/oxia/sequence_updates.go new file mode 100644 index 00000000..9bddd067 --- /dev/null +++ b/oxia/sequence_updates.go @@ -0,0 +1,124 @@ +// Copyright 2023 StreamNative, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package oxia + +import ( + "context" + "errors" + "log/slog" + "time" + + "github.com/cenkalti/backoff/v4" + + "github.com/streamnative/oxia/common" + "github.com/streamnative/oxia/oxia/internal" + "github.com/streamnative/oxia/proto" +) + +type sequenceUpdates struct { + prefixKey string + partitionKey string + ch chan string + shardManager internal.ShardManager + clientPool common.ClientPool + + ctx context.Context + backoff backoff.BackOff + log *slog.Logger +} + +func newSequenceUpdates(ctx context.Context, prefixKey string, partitionKey string, + clientPool common.ClientPool, shardManager internal.ShardManager) <-chan string { + su := &sequenceUpdates{ + prefixKey: prefixKey, + partitionKey: partitionKey, + ch: make(chan string), + shardManager: shardManager, + clientPool: clientPool, + ctx: ctx, + backoff: common.NewBackOffWithInitialInterval(ctx, 1*time.Second), + log: slog.With( + slog.String("component", "oxia-get-sequence-updates"), + slog.String("key", "key"), + ), + } + + go common.DoWithLabels( + su.ctx, + map[string]string{ + "oxia": "sequence-updates", + "prefixKey": prefixKey, + }, + su.getSequenceUpdatesWithRetries, + ) + + return su.ch +} + +func (su *sequenceUpdates) getSequenceUpdatesWithRetries() { //nolint:revive + _ = backoff.RetryNotify(su.getSequenceUpdates, + su.backoff, func(err error, duration time.Duration) { + if !errors.Is(err, context.Canceled) { + su.log.Error( + "Error while getting sequence updates", + slog.Any("error", err), + slog.Duration("retry-after", duration), + ) + } + }) + + // Signal that the background go-routine is now done + close(su.ch) +} + +func (su *sequenceUpdates) getSequenceUpdates() error { + shard := su.shardManager.Get(su.partitionKey) + leader := su.shardManager.Leader(shard) + + rpc, err := su.clientPool.GetClientRpc(leader) + if err != nil { + return err + } + + updates, err := rpc.GetSequenceUpdates(su.ctx, &proto.GetSequenceUpdatesRequest{ + Key: su.prefixKey, + }) + if err != nil { + if su.ctx.Err() != nil { + return su.ctx.Err() + } + return err + } + + su.backoff.Reset() + + for { + res, err2 := updates.Recv() + if err2 != nil { + return err2 + } + + if res.HighestSequenceKey == "" { + // Ignore first response if there are no sequences for the key + continue + } + + select { + case su.ch <- res.HighestSequenceKey: + case <-su.ctx.Done(): + return su.ctx.Err() + } + } +} diff --git a/oxia/sync_client_impl.go b/oxia/sync_client_impl.go index b05039ab..c8f26e3e 100644 --- a/oxia/sync_client_impl.go +++ b/oxia/sync_client_impl.go @@ -133,6 +133,10 @@ func (c *syncClientImpl) RangeScan(ctx context.Context, minKeyInclusive string, return c.asyncClient.RangeScan(ctx, minKeyInclusive, maxKeyExclusive, options...) } +func (c *syncClientImpl) GetSequenceUpdates(ctx context.Context, prefixKey string, options ...GetSequenceUpdatesOption) (<-chan string, error) { + return c.asyncClient.GetSequenceUpdates(ctx, prefixKey, options...) +} + func (c *syncClientImpl) GetNotifications() (Notifications, error) { return c.asyncClient.GetNotifications() } diff --git a/oxia/sync_client_impl_test.go b/oxia/sync_client_impl_test.go index c622b2d4..922d27ba 100644 --- a/oxia/sync_client_impl_test.go +++ b/oxia/sync_client_impl_test.go @@ -22,6 +22,7 @@ import ( "testing" "github.com/rs/zerolog/log" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/assert" @@ -61,6 +62,10 @@ func (c *neverCompleteAsyncClient) GetNotifications() (Notifications, error) { panic("not implemented") } +func (c *neverCompleteAsyncClient) GetSequenceUpdates(ctx context.Context, prefixKey string, options ...GetSequenceUpdatesOption) (<-chan string, error) { + panic("not implemented") +} + func TestCancelContext(t *testing.T) { _asyncClient := &neverCompleteAsyncClient{} syncClient := newSyncClient(_asyncClient) @@ -346,3 +351,62 @@ func TestSyncClientImpl_SecondaryIndexes_Get(t *testing.T) { assert.NoError(t, client.Close()) assert.NoError(t, standaloneServer.Close()) } + +func TestSyncClientImpl_GetSequenceUpdates(t *testing.T) { + standaloneServer, err := server.NewStandalone(server.NewTestConfig(t.TempDir())) + assert.NoError(t, err) + + serviceAddress := fmt.Sprintf("localhost:%d", standaloneServer.RpcPort()) + client, err := NewSyncClient(serviceAddress, WithBatchLinger(0)) + assert.NoError(t, err) + + ch1, err := client.GetSequenceUpdates(context.Background(), "a") + assert.Nil(t, ch1) + assert.ErrorIs(t, err, ErrInvalidOptions) + + ctx1, cancel1 := context.WithCancel(context.Background()) + ch1, err = client.GetSequenceUpdates(ctx1, "a", PartitionKey("x")) + assert.NotNil(t, ch1) + assert.NoError(t, err) + cancel1() + + k1, _, _ := client.Put(context.Background(), "a", []byte("0"), PartitionKey("x"), SequenceKeysDeltas(1)) + assert.Equal(t, fmt.Sprintf("a-%020d", 1), k1) + k2, _, _ := client.Put(context.Background(), "a", []byte("0"), PartitionKey("x"), SequenceKeysDeltas(1)) + assert.Equal(t, fmt.Sprintf("a-%020d", 2), k2) + assert.NotEqual(t, k1, k2) + + ctx2, cancel2 := context.WithCancel(context.Background()) + updates2, err := client.GetSequenceUpdates(ctx2, "a", PartitionKey("x")) + require.NoError(t, err) + + recvK2 := <-updates2 + assert.Equal(t, k2, recvK2) + + cancel2() + + k3, _, _ := client.Put(context.Background(), "a", []byte("0"), PartitionKey("x"), SequenceKeysDeltas(1)) + assert.Empty(t, updates2) + + select { + case <-updates2: + // Ok + + default: + assert.Fail(t, "should have been closed") + } + + updates3, err := client.GetSequenceUpdates(context.Background(), "a", PartitionKey("x")) + require.NoError(t, err) + + recvK3 := <-updates3 + assert.Equal(t, k3, recvK3) + + k4, _, _ := client.Put(context.Background(), "a", []byte("0"), PartitionKey("x"), SequenceKeysDeltas(1)) + recvK4 := <-updates3 + assert.Equal(t, k4, recvK4) + + assert.NoError(t, client.Close()) + + assert.NoError(t, standaloneServer.Close()) +} diff --git a/server/leader_controller.go b/server/leader_controller.go index edcb3cc6..67c8de32 100644 --- a/server/leader_controller.go +++ b/server/leader_controller.go @@ -648,6 +648,9 @@ func (lc *leaderController) GetSequenceUpdates(req *proto.GetSequenceUpdatesRequ return err } + case <-lc.ctx.Done(): + return lc.ctx.Err() + case <-stream.Context().Done(): return stream.Context().Err() }