Skip to content

Commit 4653c34

Browse files
committed
Fix SubscribeOption for event streaming
1 parent f2f6970 commit 4653c34

3 files changed

Lines changed: 24 additions & 48 deletions

File tree

access/client.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"context"
2828

2929
"github.com/onflow/cadence"
30+
"google.golang.org/grpc"
3031

3132
"github.com/onflow/flow-go-sdk"
3233
)
@@ -300,10 +301,23 @@ type SubscribeOption func(*SubscribeConfig)
300301

301302
type SubscribeConfig struct {
302303
HeartbeatInterval uint64
304+
GrpcOpts []grpc.CallOption
303305
}
304306

305307
func WithHeartbeatInterval(interval uint64) SubscribeOption {
306308
return func(config *SubscribeConfig) {
307309
config.HeartbeatInterval = interval
308310
}
309311
}
312+
313+
func WithGRPCOptions(grpcOpts ...grpc.CallOption) SubscribeOption {
314+
return func(config *SubscribeConfig) {
315+
config.GrpcOpts = grpcOpts
316+
}
317+
}
318+
319+
func DefaultSubscribeConfig() *SubscribeConfig {
320+
return &SubscribeConfig{
321+
HeartbeatInterval: 100,
322+
}
323+
}

access/grpc/client.go

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -315,8 +315,7 @@ func (c *Client) SubscribeEventsByBlockID(
315315
filter flow.EventFilter,
316316
opts ...access.SubscribeOption,
317317
) (<-chan flow.BlockEvents, <-chan error, error) {
318-
conf := convertSubscribeOptions(opts...)
319-
return c.grpc.SubscribeEventsByBlockID(ctx, startBlockID, filter, WithHeartbeatInterval(conf.heartbeatInterval))
318+
return c.grpc.SubscribeEventsByBlockID(ctx, startBlockID, filter, opts...)
320319
}
321320

322321
func (c *Client) SubscribeEventsByBlockHeight(
@@ -325,8 +324,7 @@ func (c *Client) SubscribeEventsByBlockHeight(
325324
filter flow.EventFilter,
326325
opts ...access.SubscribeOption,
327326
) (<-chan flow.BlockEvents, <-chan error, error) {
328-
conf := convertSubscribeOptions(opts...)
329-
return c.grpc.SubscribeEventsByBlockHeight(ctx, startHeight, filter, WithHeartbeatInterval(conf.heartbeatInterval))
327+
return c.grpc.SubscribeEventsByBlockHeight(ctx, startHeight, filter, opts...)
330328
}
331329

332330
func (c *Client) SubscribeBlockDigestsFromStartBlockID(
@@ -402,18 +400,6 @@ func (c *Client) Close() error {
402400
return c.grpc.Close()
403401
}
404402

405-
// convertSubscribeOptions creates the default subscribe config and applies all the provided options
406-
func convertSubscribeOptions(opts ...access.SubscribeOption) *SubscribeConfig {
407-
subsConf := DefaultSubscribeConfig()
408-
conf := &access.SubscribeConfig{
409-
HeartbeatInterval: subsConf.heartbeatInterval,
410-
}
411-
for _, opt := range opts {
412-
opt(conf)
413-
}
414-
return subsConf
415-
}
416-
417403
func (c *Client) SubscribeAccountStatusesFromStartHeight(
418404
ctx context.Context,
419405
startBlockHeight uint64,

access/grpc/grpc.go

Lines changed: 8 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
"github.com/onflow/flow/protobuf/go/flow/executiondata"
3838

3939
"github.com/onflow/flow-go-sdk"
40+
base "github.com/onflow/flow-go-sdk/access"
4041
"github.com/onflow/flow-go-sdk/access/grpc/convert"
4142
)
4243

@@ -50,31 +51,6 @@ type ExecutionDataRPCClient interface {
5051
executiondata.ExecutionDataAPIClient
5152
}
5253

53-
type SubscribeOption func(*SubscribeConfig)
54-
55-
type SubscribeConfig struct {
56-
heartbeatInterval uint64
57-
grpcOpts []grpc.CallOption
58-
}
59-
60-
func DefaultSubscribeConfig() *SubscribeConfig {
61-
return &SubscribeConfig{
62-
heartbeatInterval: 100,
63-
}
64-
}
65-
66-
func WithHeartbeatInterval(interval uint64) SubscribeOption {
67-
return func(config *SubscribeConfig) {
68-
config.heartbeatInterval = interval
69-
}
70-
}
71-
72-
func WithGRPCOptions(grpcOpts ...grpc.CallOption) SubscribeOption {
73-
return func(config *SubscribeConfig) {
74-
config.grpcOpts = grpcOpts
75-
}
76-
}
77-
7854
// BaseClient is a gRPC client for the Flow Access API exposing all grpc specific methods.
7955
//
8056
// Use this client if you need advance access to the HTTP API. If you
@@ -89,7 +65,7 @@ type BaseClient struct {
8965

9066
// NewBaseClient creates a new gRPC handler for network communication.
9167
func NewBaseClient(url string, opts ...grpc.DialOption) (*BaseClient, error) {
92-
conn, err := grpc.Dial(url, opts...)
68+
conn, err := grpc.NewClient(url, opts...)
9369
if err != nil {
9470
return nil, err
9571
}
@@ -1078,7 +1054,7 @@ func (c *BaseClient) SubscribeEventsByBlockID(
10781054
ctx context.Context,
10791055
startBlockID flow.Identifier,
10801056
filter flow.EventFilter,
1081-
opts ...SubscribeOption,
1057+
opts ...base.SubscribeOption,
10821058
) (<-chan flow.BlockEvents, <-chan error, error) {
10831059
req := executiondata.SubscribeEventsRequest{
10841060
StartBlockId: startBlockID[:],
@@ -1091,7 +1067,7 @@ func (c *BaseClient) SubscribeEventsByBlockHeight(
10911067
ctx context.Context,
10921068
startHeight uint64,
10931069
filter flow.EventFilter,
1094-
opts ...SubscribeOption,
1070+
opts ...base.SubscribeOption,
10951071
) (<-chan flow.BlockEvents, <-chan error, error) {
10961072
req := executiondata.SubscribeEventsRequest{
10971073
StartBlockHeight: startHeight,
@@ -1104,9 +1080,9 @@ func (c *BaseClient) subscribeEvents(
11041080
ctx context.Context,
11051081
req *executiondata.SubscribeEventsRequest,
11061082
filter flow.EventFilter,
1107-
opts ...SubscribeOption,
1083+
opts ...base.SubscribeOption,
11081084
) (<-chan flow.BlockEvents, <-chan error, error) {
1109-
conf := DefaultSubscribeConfig()
1085+
conf := base.DefaultSubscribeConfig()
11101086
for _, apply := range opts {
11111087
apply(conf)
11121088
}
@@ -1116,9 +1092,9 @@ func (c *BaseClient) subscribeEvents(
11161092
Address: filter.Addresses,
11171093
Contract: filter.Contracts,
11181094
}
1119-
req.HeartbeatInterval = conf.heartbeatInterval
1095+
req.HeartbeatInterval = conf.HeartbeatInterval
11201096

1121-
stream, err := c.executionDataClient.SubscribeEvents(ctx, req, conf.grpcOpts...)
1097+
stream, err := c.executionDataClient.SubscribeEvents(ctx, req, conf.GrpcOpts...)
11221098
if err != nil {
11231099
return nil, nil, err
11241100
}

0 commit comments

Comments
 (0)