Skip to content

Client SDK implementation for get-sequence-updates #689

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/client/common/client_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,7 @@ func (m *MockClient) RangeScan(_ context.Context, minKeyInclusive string, maxKey
func (*MockClient) GetNotifications() (oxia.Notifications, error) {
return nil, errors.New("not implemented in mock")
}

func (*MockClient) GetSequenceUpdates(context.Context, string, ...oxia.GetSequenceUpdatesOption) (<-chan string, error) {
return nil, errors.New("not implemented in mock")
}
9 changes: 9 additions & 0 deletions oxia/async_client_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,15 @@ func (c *clientImpl) RangeScan(ctx context.Context, minKeyInclusive string, maxK
return outCh
}

func (c *clientImpl) GetSequenceUpdates(ctx context.Context, prefixKey string, options ...GetSequenceUpdatesOption) (<-chan string, error) {
opts := newGetSequenceUpdatesOptions(options)
if opts.partitionKey == nil {
return nil, errors.Wrap(ErrInvalidOptions, "partitionKey is required")
}

return newSequenceUpdates(ctx, prefixKey, *opts.partitionKey, c.clientPool, c.shardManager), nil
}

// We do range scan on all the shards, and we need to always pick the lowest key
// across all the shards.
func aggregateAndSortRangeScanAcrossShards(channels []chan GetResult, outCh chan GetResult) {
Expand Down
12 changes: 12 additions & 0 deletions oxia/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ type AsyncClient interface {
// https://github.com/streamnative/oxia/blob/main/docs/oxia-key-sorting.md
RangeScan(ctx context.Context, minKeyInclusive string, maxKeyExclusive string, options ...RangeScanOption) <-chan GetResult

// GetSequenceUpdates allows to subscribe to the updates happening on a sequential key
// The channel will report the current latest sequence for a given key.
// Multiple updates can be collapsed into one single event with the
// highest sequence.
GetSequenceUpdates(ctx context.Context, prefixKey string, options ...GetSequenceUpdatesOption) (<-chan string, error)

// GetNotifications creates a new subscription to receive the notifications
// from Oxia for any change that is applied to the database
GetNotifications() (Notifications, error)
Expand Down Expand Up @@ -164,6 +170,12 @@ type SyncClient interface {
// inserted with that partition key).
RangeScan(ctx context.Context, minKeyInclusive string, maxKeyExclusive string, options ...RangeScanOption) <-chan GetResult

// GetSequenceUpdates allows to subscribe to the updates happening on a sequential key
// The channel will report the current latest sequence for a given key.
// Multiple updates can be collapsed into one single event with the
// highest sequence.
GetSequenceUpdates(ctx context.Context, prefixKey string, options ...GetSequenceUpdatesOption) (<-chan string, error)

// GetNotifications creates a new subscription to receive the notifications
// from Oxia for any change that is applied to the database
GetNotifications() (Notifications, error)
Expand Down
5 changes: 5 additions & 0 deletions oxia/options_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type BaseOption interface {
DeleteRangeOption
ListOption
RangeScanOption
GetSequenceUpdatesOption
}

type baseOptions struct {
Expand Down Expand Up @@ -66,6 +67,10 @@ func (o *partitionKeyOpt) applyRangeScan(opts *rangeScanOptions) {
opts.partitionKey = o.partitionKey
}

func (o *partitionKeyOpt) applyGetSequenceUpdates(opts *getSequenceUpdatesOptions) {
opts.partitionKey = o.partitionKey
}

// PartitionKey overrides the partition routing with the specified `partitionKey` instead
// of the regular record key.
// Records with the same partitionKey will always be guaranteed to be co-located in the
Expand Down
32 changes: 32 additions & 0 deletions oxia/options_get_sequence_updates.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2023 StreamNative, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package oxia

type getSequenceUpdatesOptions struct {
baseOptions
}

// GetSequenceUpdatesOption represents an option for the [SyncClient.GetSequenceUpdates] operation.
type GetSequenceUpdatesOption interface {
applyGetSequenceUpdates(opts *getSequenceUpdatesOptions)
}

func newGetSequenceUpdatesOptions(opts []GetSequenceUpdatesOption) *getSequenceUpdatesOptions {
getSequenceUpdatesOptions := &getSequenceUpdatesOptions{}
for _, opt := range opts {
opt.applyGetSequenceUpdates(getSequenceUpdatesOptions)
}
return getSequenceUpdatesOptions
}
124 changes: 124 additions & 0 deletions oxia/sequence_updates.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright 2023 StreamNative, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package oxia

import (
"context"
"errors"
"log/slog"
"time"

"github.com/cenkalti/backoff/v4"

"github.com/streamnative/oxia/common"
"github.com/streamnative/oxia/oxia/internal"
"github.com/streamnative/oxia/proto"
)

type sequenceUpdates struct {
prefixKey string
partitionKey string
ch chan string
shardManager internal.ShardManager
clientPool common.ClientPool

ctx context.Context
backoff backoff.BackOff
log *slog.Logger
}

func newSequenceUpdates(ctx context.Context, prefixKey string, partitionKey string,
clientPool common.ClientPool, shardManager internal.ShardManager) <-chan string {
su := &sequenceUpdates{
prefixKey: prefixKey,
partitionKey: partitionKey,
ch: make(chan string),
shardManager: shardManager,
clientPool: clientPool,
ctx: ctx,
backoff: common.NewBackOffWithInitialInterval(ctx, 1*time.Second),
log: slog.With(
slog.String("component", "oxia-get-sequence-updates"),
slog.String("key", "key"),
),
}

go common.DoWithLabels(
su.ctx,
map[string]string{
"oxia": "sequence-updates",
"prefixKey": prefixKey,
},
su.getSequenceUpdatesWithRetries,
)

return su.ch
}

func (su *sequenceUpdates) getSequenceUpdatesWithRetries() { //nolint:revive
_ = backoff.RetryNotify(su.getSequenceUpdates,
su.backoff, func(err error, duration time.Duration) {
if !errors.Is(err, context.Canceled) {
su.log.Error(
"Error while getting sequence updates",
slog.Any("error", err),
slog.Duration("retry-after", duration),
)
}
})

// Signal that the background go-routine is now done
close(su.ch)
}

func (su *sequenceUpdates) getSequenceUpdates() error {
shard := su.shardManager.Get(su.partitionKey)
leader := su.shardManager.Leader(shard)

rpc, err := su.clientPool.GetClientRpc(leader)
if err != nil {
return err
}

updates, err := rpc.GetSequenceUpdates(su.ctx, &proto.GetSequenceUpdatesRequest{
Key: su.prefixKey,
})
if err != nil {
if su.ctx.Err() != nil {
return su.ctx.Err()
}
return err
}

su.backoff.Reset()

for {
res, err2 := updates.Recv()
if err2 != nil {
return err2
}

if res.HighestSequenceKey == "" {
// Ignore first response if there are no sequences for the key
continue
}

select {
case su.ch <- res.HighestSequenceKey:
case <-su.ctx.Done():
return su.ctx.Err()
}
}
}
4 changes: 4 additions & 0 deletions oxia/sync_client_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ func (c *syncClientImpl) RangeScan(ctx context.Context, minKeyInclusive string,
return c.asyncClient.RangeScan(ctx, minKeyInclusive, maxKeyExclusive, options...)
}

func (c *syncClientImpl) GetSequenceUpdates(ctx context.Context, prefixKey string, options ...GetSequenceUpdatesOption) (<-chan string, error) {
return c.asyncClient.GetSequenceUpdates(ctx, prefixKey, options...)
}

func (c *syncClientImpl) GetNotifications() (Notifications, error) {
return c.asyncClient.GetNotifications()
}
64 changes: 64 additions & 0 deletions oxia/sync_client_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"testing"

"github.com/rs/zerolog/log"
"github.com/stretchr/testify/require"

"github.com/stretchr/testify/assert"

Expand Down Expand Up @@ -61,6 +62,10 @@ func (c *neverCompleteAsyncClient) GetNotifications() (Notifications, error) {
panic("not implemented")
}

func (c *neverCompleteAsyncClient) GetSequenceUpdates(ctx context.Context, prefixKey string, options ...GetSequenceUpdatesOption) (<-chan string, error) {
panic("not implemented")
}

func TestCancelContext(t *testing.T) {
_asyncClient := &neverCompleteAsyncClient{}
syncClient := newSyncClient(_asyncClient)
Expand Down Expand Up @@ -346,3 +351,62 @@ func TestSyncClientImpl_SecondaryIndexes_Get(t *testing.T) {
assert.NoError(t, client.Close())
assert.NoError(t, standaloneServer.Close())
}

func TestSyncClientImpl_GetSequenceUpdates(t *testing.T) {
standaloneServer, err := server.NewStandalone(server.NewTestConfig(t.TempDir()))
assert.NoError(t, err)

serviceAddress := fmt.Sprintf("localhost:%d", standaloneServer.RpcPort())
client, err := NewSyncClient(serviceAddress, WithBatchLinger(0))
assert.NoError(t, err)

ch1, err := client.GetSequenceUpdates(context.Background(), "a")
assert.Nil(t, ch1)
assert.ErrorIs(t, err, ErrInvalidOptions)

ctx1, cancel1 := context.WithCancel(context.Background())
ch1, err = client.GetSequenceUpdates(ctx1, "a", PartitionKey("x"))
assert.NotNil(t, ch1)
assert.NoError(t, err)
cancel1()

k1, _, _ := client.Put(context.Background(), "a", []byte("0"), PartitionKey("x"), SequenceKeysDeltas(1))
assert.Equal(t, fmt.Sprintf("a-%020d", 1), k1)
k2, _, _ := client.Put(context.Background(), "a", []byte("0"), PartitionKey("x"), SequenceKeysDeltas(1))
assert.Equal(t, fmt.Sprintf("a-%020d", 2), k2)
assert.NotEqual(t, k1, k2)

ctx2, cancel2 := context.WithCancel(context.Background())
updates2, err := client.GetSequenceUpdates(ctx2, "a", PartitionKey("x"))
require.NoError(t, err)

recvK2 := <-updates2
assert.Equal(t, k2, recvK2)

cancel2()

k3, _, _ := client.Put(context.Background(), "a", []byte("0"), PartitionKey("x"), SequenceKeysDeltas(1))
assert.Empty(t, updates2)

select {
case <-updates2:
// Ok

default:
assert.Fail(t, "should have been closed")
}

updates3, err := client.GetSequenceUpdates(context.Background(), "a", PartitionKey("x"))
require.NoError(t, err)

recvK3 := <-updates3
assert.Equal(t, k3, recvK3)

k4, _, _ := client.Put(context.Background(), "a", []byte("0"), PartitionKey("x"), SequenceKeysDeltas(1))
recvK4 := <-updates3
assert.Equal(t, k4, recvK4)

assert.NoError(t, client.Close())

assert.NoError(t, standaloneServer.Close())
}
3 changes: 3 additions & 0 deletions server/leader_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,9 @@ func (lc *leaderController) GetSequenceUpdates(req *proto.GetSequenceUpdatesRequ
return err
}

case <-lc.ctx.Done():
return lc.ctx.Err()

case <-stream.Context().Done():
return stream.Context().Err()
}
Expand Down
Loading