Skip to content

Commit 50290f6

Browse files
Merge pull request #20 from kaleido-io/eventstreams-fix
Fix event stream topic selection
2 parents c1bd3e8 + dca4589 commit 50290f6

File tree

3 files changed

+15
-9
lines changed

3 files changed

+15
-9
lines changed

internal/events/eventstream.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ var esDefaults struct {
5959
blockedRetryDelay fftypes.FFDuration
6060
webhookRequestTimeout fftypes.FFDuration
6161
websocketDistributionMode apitypes.DistributionMode
62-
topic string
6362
retry *retry.Retry
6463
}
6564

@@ -164,7 +163,7 @@ func (es *eventStream) initAction(startedState *startedStreamState) {
164163
case apitypes.EventStreamTypeWebhook:
165164
startedState.action = newWebhookAction(ctx, es.spec.Webhook).attemptBatch
166165
case apitypes.EventStreamTypeWebSocket:
167-
startedState.action = newWebSocketAction(es.wsChannels, es.spec.WebSocket, *es.spec.Name).attemptBatch
166+
startedState.action = newWebSocketAction(es.wsChannels, es.spec.WebSocket, *es.spec.WebSocket.Topic).attemptBatch
168167
default:
169168
// mergeValidateEsConfig always be called previous to this
170169
panic(i18n.NewError(ctx, tmmsgs.MsgInvalidStreamType, *es.spec.Type))
@@ -237,7 +236,7 @@ func mergeValidateEsConfig(ctx context.Context, base *apitypes.EventStream, upda
237236
changed = apitypes.CheckUpdateEnum(changed, &merged.Type, base.Type, updates.Type, apitypes.EventStreamTypeWebSocket)
238237
switch *merged.Type {
239238
case apitypes.EventStreamTypeWebSocket:
240-
if merged.WebSocket, changed, err = mergeValidateWsConfig(ctx, changed, base.WebSocket, updates.WebSocket); err != nil {
239+
if merged.WebSocket, changed, err = mergeValidateWsConfig(ctx, changed, *merged.Name, base.WebSocket, updates.WebSocket); err != nil {
241240
return nil, false, err
242241
}
243242
case apitypes.EventStreamTypeWebhook:

internal/events/eventstream_test.go

+11-4
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,10 @@ func TestConfigNewDefaultsUpdate(t *testing.T) {
135135
InitDefaults()
136136

137137
es := testESConf(t, `{
138-
"name": "test1"
138+
"name": "test1",
139+
"websocket": {
140+
"topic": "test1"
141+
}
139142
}`)
140143
es, changed, err := mergeValidateEsConfig(context.Background(), nil, es)
141144
assert.NoError(t, err)
@@ -157,7 +160,7 @@ func TestConfigNewDefaultsUpdate(t *testing.T) {
157160
"type":"websocket",
158161
"websocket": {
159162
"distributionMode":"load_balance",
160-
"topic":""
163+
"topic":"test1"
161164
}
162165
}`, string(b))
163166

@@ -297,7 +300,10 @@ func TestInitActionBadAction(t *testing.T) {
297300
func TestWebSocketEventStreamsE2EMigrationThenStart(t *testing.T) {
298301

299302
es := newTestEventStream(t, `{
300-
"name": "ut_stream"
303+
"name": "ut_stream",
304+
"websocket": {
305+
"topic": "ut_stream"
306+
}
301307
}`)
302308

303309
addr := "0x12345"
@@ -1198,7 +1204,8 @@ func TestWebSocketBroadcastActionCloseDuringCheckpoint(t *testing.T) {
11981204
es := newTestEventStream(t, `{
11991205
"name": "ut_stream",
12001206
"websocket": {
1201-
"distributionMode": "broadcast"
1207+
"distributionMode": "broadcast",
1208+
"topic": "ut_stream"
12021209
}
12031210
}`)
12041211

internal/events/websockets.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import (
2626
"github.com/hyperledger/firefly-transaction-manager/pkg/apitypes"
2727
)
2828

29-
func mergeValidateWsConfig(ctx context.Context, changed bool, base *apitypes.WebSocketConfig, updates *apitypes.WebSocketConfig) (*apitypes.WebSocketConfig, bool, error) {
29+
func mergeValidateWsConfig(ctx context.Context, changed bool, esName string, base *apitypes.WebSocketConfig, updates *apitypes.WebSocketConfig) (*apitypes.WebSocketConfig, bool, error) {
3030

3131
if base == nil {
3232
base = &apitypes.WebSocketConfig{}
@@ -48,7 +48,7 @@ func mergeValidateWsConfig(ctx context.Context, changed bool, base *apitypes.Web
4848
}
4949

5050
// Topic
51-
changed = apitypes.CheckUpdateString(changed, &merged.Topic, base.Topic, updates.Topic, esDefaults.topic)
51+
changed = apitypes.CheckUpdateString(changed, &merged.Topic, base.Topic, updates.Topic, esName /* default to the ES name */)
5252

5353
return merged, changed, nil
5454
}

0 commit comments

Comments
 (0)