Skip to content

Commit c7a52d7

Browse files
authored
Attach context to client SDK go routines that were missing it (#688)
1 parent ef635c9 commit c7a52d7

File tree

8 files changed

+41
-21
lines changed

8 files changed

+41
-21
lines changed

common/batch/batcher_factory.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,12 @@
1515
package batch
1616

1717
import (
18+
"context"
19+
"fmt"
1820
"runtime"
1921
"time"
22+
23+
"github.com/streamnative/oxia/common"
2024
)
2125

2226
var batcherChannelBufferSize = runtime.GOMAXPROCS(-1)
@@ -26,7 +30,7 @@ type BatcherFactory struct {
2630
MaxRequestsPerBatch int
2731
}
2832

29-
func (b *BatcherFactory) NewBatcher(batchFactory func() Batch) Batcher {
33+
func (b *BatcherFactory) NewBatcher(ctx context.Context, shard int64, batcherType string, batchFactory func() Batch) Batcher {
3034
batcher := &batcherImpl{
3135
batchFactory: batchFactory,
3236
callC: make(chan any, batcherChannelBufferSize),
@@ -35,7 +39,10 @@ func (b *BatcherFactory) NewBatcher(batchFactory func() Batch) Batcher {
3539
maxRequestsPerBatch: b.MaxRequestsPerBatch,
3640
}
3741

38-
go batcher.Run()
42+
go common.DoWithLabels(ctx, map[string]string{
43+
"oxia": fmt.Sprintf("batcher-%s", batcherType),
44+
"shard": fmt.Sprintf("%d", shard),
45+
}, batcher.Run)
3946

4047
return batcher
4148
}

common/batch/batcher_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package batch
1616

1717
import (
18+
"context"
1819
"testing"
1920
"time"
2021

@@ -78,7 +79,7 @@ func TestBatcher(t *testing.T) {
7879
Linger: item.linger,
7980
MaxRequestsPerBatch: item.maxSize,
8081
}
81-
batcher := factory.NewBatcher(batchFactory)
82+
batcher := factory.NewBatcher(context.Background(), 1, "test-write", batchFactory)
8283
batcher.Add(1)
8384

8485
if item.closeImmediately {

oxia/async_client_impl.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,10 @@ func NewAsyncClient(serviceAddress string, opts ...ClientOption) (AsyncClient, e
8585
options: options,
8686
clientPool: clientPool,
8787
shardManager: shardManager,
88-
writeBatchManager: batch.NewManager(func(shard *int64) commonbatch.Batcher {
89-
return batcherFactory.NewWriteBatcher(shard, options.maxBatchSize)
88+
writeBatchManager: batch.NewManager(ctx, func(ctx context.Context, shard *int64) commonbatch.Batcher {
89+
return batcherFactory.NewWriteBatcher(ctx, shard, options.maxBatchSize)
9090
}),
91-
readBatchManager: batch.NewManager(batcherFactory.NewReadBatcher),
91+
readBatchManager: batch.NewManager(ctx, batcherFactory.NewReadBatcher),
9292
executor: executor,
9393
}
9494

oxia/internal/batch/batcher_factory.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package batch
1616

1717
import (
18+
"context"
1819
"time"
1920

2021
"github.com/streamnative/oxia/common/batch"
@@ -49,25 +50,25 @@ func NewBatcherFactory(
4950
}
5051
}
5152

52-
func (b *BatcherFactory) NewWriteBatcher(shardId *int64, maxWriteBatchSize int) batch.Batcher {
53-
return b.newBatcher(shardId, writeBatchFactory{
53+
func (b *BatcherFactory) NewWriteBatcher(ctx context.Context, shardId *int64, maxWriteBatchSize int) batch.Batcher {
54+
return b.newBatcher(ctx, shardId, "write", writeBatchFactory{
5455
execute: b.Executor.ExecuteWrite,
5556
metrics: b.Metrics,
5657
requestTimeout: b.RequestTimeout,
5758
maxByteSize: maxWriteBatchSize,
5859
}.newBatch)
5960
}
6061

61-
func (b *BatcherFactory) NewReadBatcher(shardId *int64) batch.Batcher {
62-
return b.newBatcher(shardId, readBatchFactory{
62+
func (b *BatcherFactory) NewReadBatcher(ctx context.Context, shardId *int64) batch.Batcher {
63+
return b.newBatcher(ctx, shardId, "read", readBatchFactory{
6364
execute: b.Executor.ExecuteRead,
6465
metrics: b.Metrics,
6566
requestTimeout: b.RequestTimeout,
6667
}.newBatch)
6768
}
6869

69-
func (b *BatcherFactory) newBatcher(shardId *int64, batchFactory func(shardId *int64) batch.Batch) batch.Batcher {
70-
return b.NewBatcher(func() batch.Batch {
70+
func (b *BatcherFactory) newBatcher(ctx context.Context, shardId *int64, batcherType string, batchFactory func(shardId *int64) batch.Batch) batch.Batcher {
71+
return b.NewBatcher(ctx, *shardId, batcherType, func() batch.Batch {
7172
return batchFactory(shardId)
7273
})
7374
}

oxia/internal/batch/manager.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,23 +15,26 @@
1515
package batch
1616

1717
import (
18+
"context"
1819
"sync"
1920

2021
"go.uber.org/multierr"
2122

2223
"github.com/streamnative/oxia/common/batch"
2324
)
2425

25-
func NewManager(batcherFactory func(*int64) batch.Batcher) *Manager {
26+
func NewManager(ctx context.Context, batcherFactory func(context.Context, *int64) batch.Batcher) *Manager {
2627
return &Manager{
28+
ctx: ctx,
2729
batcherFactory: batcherFactory,
2830
batchers: make(map[int64]batch.Batcher),
2931
}
3032
}
3133

3234
type Manager struct {
3335
sync.RWMutex
34-
batcherFactory func(*int64) batch.Batcher
36+
ctx context.Context
37+
batcherFactory func(context.Context, *int64) batch.Batcher
3538
batchers map[int64]batch.Batcher
3639
}
3740

@@ -49,7 +52,7 @@ func (m *Manager) Get(shardId int64) batch.Batcher {
4952
defer m.Unlock()
5053

5154
if batcher, ok = m.batchers[shardId]; !ok {
52-
batcher = m.batcherFactory(&shardId)
55+
batcher = m.batcherFactory(m.ctx, &shardId)
5356
m.batchers[shardId] = batcher
5457
}
5558
return batcher

oxia/internal/batch/manager_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package batch
1616

1717
import (
18+
"context"
1819
"errors"
1920
"testing"
2021

@@ -42,12 +43,12 @@ func TestManager(t *testing.T) {
4243
testBatcher := &testBatcher{}
4344

4445
newBatcherInvocations := 0
45-
batcherFactory := func(*int64) batch.Batcher {
46+
batcherFactory := func(context.Context, *int64) batch.Batcher {
4647
newBatcherInvocations++
4748
return testBatcher
4849
}
4950

50-
manager := NewManager(batcherFactory)
51+
manager := NewManager(context.Background(), batcherFactory)
5152

5253
batcher := manager.Get(shardId)
5354
assert.Equal(t, testBatcher, batcher)

oxia/internal/executor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ func (e *executorImpl) writeStream(shardId *int64) (*streamWrapper, error) {
133133
return nil, err
134134
}
135135

136-
sw = newStreamWrapper(stream)
136+
sw = newStreamWrapper(*shardId, stream)
137137

138138
e.Lock()
139139
defer e.Unlock()

oxia/internal/write_stream.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package internal
1616

1717
import (
1818
"context"
19+
"fmt"
1920
"io"
2021
"log/slog"
2122
"sync"
@@ -33,14 +34,20 @@ type streamWrapper struct {
3334
failed atomic.Bool
3435
}
3536

36-
func newStreamWrapper(stream proto.OxiaClient_WriteStreamClient) *streamWrapper {
37+
func newStreamWrapper(shard int64, stream proto.OxiaClient_WriteStreamClient) *streamWrapper {
3738
sw := &streamWrapper{
3839
stream: stream,
3940
pendingRequests: nil,
4041
}
4142

42-
go sw.handleResponses()
43-
go sw.handleStreamClosed()
43+
go common.DoWithLabels(stream.Context(), map[string]string{
44+
"oxia": "write-stream-handle-response",
45+
"shard": fmt.Sprintf("%d", shard),
46+
}, sw.handleResponses)
47+
go common.DoWithLabels(stream.Context(), map[string]string{
48+
"oxia": "write-stream-handle-stream-closed",
49+
"shard": fmt.Sprintf("%d", shard),
50+
}, sw.handleStreamClosed)
4451
return sw
4552
}
4653

0 commit comments

Comments
 (0)