1
- // Copyright © 2022 Kaleido, Inc.
1
+ // Copyright © 2024 Kaleido, Inc.
2
2
//
3
3
// SPDX-License-Identifier: Apache-2.0
4
4
//
@@ -20,9 +20,9 @@ import { AxiosRequestConfig } from 'axios';
20
20
import { lastValueFrom } from 'rxjs' ;
21
21
import WebSocket from 'ws' ;
22
22
import { FFRequestIDHeader } from '../request-context/constants' ;
23
- import { Context } from '../request-context/request-context.decorator' ;
23
+ import { Context , newContext } from '../request-context/request-context.decorator' ;
24
24
import { IAbiMethod } from '../tokens/tokens.interfaces' ;
25
- import { getHttpRequestOptions , getWebsocketOptions } from '../utils' ;
25
+ import { eventStreamName , getHttpRequestOptions , getWebsocketOptions } from '../utils' ;
26
26
import {
27
27
Event ,
28
28
EventBatch ,
@@ -46,6 +46,7 @@ export class EventStreamSocket {
46
46
constructor (
47
47
private url : string ,
48
48
private topic : string ,
49
+ private namespace : string ,
49
50
private username : string ,
50
51
private password : string ,
51
52
private handleEvents : ( events : EventBatch ) => void ,
@@ -67,7 +68,7 @@ export class EventStreamSocket {
67
68
} else {
68
69
this . logger . log ( 'Event stream websocket connected' ) ;
69
70
}
70
- this . produce ( { type : 'listen' , topic : this . topic } ) ;
71
+ this . produce ( { type : 'listen' , topic : eventStreamName ( this . topic , this . namespace ) } ) ;
71
72
this . produce ( { type : 'listenreplies' } ) ;
72
73
this . ping ( ) ;
73
74
} )
@@ -83,6 +84,7 @@ export class EventStreamSocket {
83
84
}
84
85
} )
85
86
. on ( 'message' , ( message : string ) => {
87
+ this . logger . verbose ( `WS => ${ message } ` ) ;
86
88
this . handleMessage ( JSON . parse ( message ) ) ;
87
89
} )
88
90
. on ( 'pong' , ( ) => {
@@ -109,7 +111,11 @@ export class EventStreamSocket {
109
111
}
110
112
111
113
ack ( batchNumber : number | undefined ) {
112
- this . produce ( { type : 'ack' , topic : this . topic , batchNumber } ) ;
114
+ this . produce ( { type : 'ack' , topic : eventStreamName ( this . topic , this . namespace ) , batchNumber } ) ;
115
+ }
116
+
117
+ nack ( batchNumber : number | undefined ) {
118
+ this . produce ( { type : 'nack' , topic : eventStreamName ( this . topic , this . namespace ) , batchNumber } ) ;
113
119
}
114
120
115
121
close ( ) {
@@ -193,13 +199,27 @@ export class EventStreamService {
193
199
batchSize : 50 ,
194
200
batchTimeoutMS : 500 ,
195
201
type : 'websocket' ,
196
- websocket : { topic } ,
202
+ websocket : { topic : name } ,
197
203
blockedReryDelaySec : 30 , // intentional due to spelling error in ethconnect
198
204
inputs : true ,
199
205
timestamps : true ,
200
206
} ;
201
207
202
208
const existingStreams = await this . getStreams ( ctx ) ;
209
+
210
+ // Check to see if there is a deprecated stream that we should remove
211
+ this . logger . debug ( `Checking for deprecated event steam with topic '${ topic } '` ) ;
212
+ const deprecatedStream = existingStreams . find ( s => s . name === topic ) ;
213
+ if ( deprecatedStream ) {
214
+ this . logger . log ( `Purging deprecated eventstream '${ deprecatedStream . id } '` ) ;
215
+ await lastValueFrom (
216
+ this . http . delete (
217
+ new URL ( `/eventstreams/${ deprecatedStream . id } ` , this . baseUrl ) . href ,
218
+ this . requestOptions ( ctx ) ,
219
+ ) ,
220
+ ) ;
221
+ }
222
+
203
223
const stream = existingStreams . find ( s => s . name === streamDetails . name ) ;
204
224
if ( stream ) {
205
225
const patchedStreamRes = await lastValueFrom (
@@ -331,15 +351,20 @@ export class EventStreamService {
331
351
return true ;
332
352
}
333
353
334
- connect (
354
+ async connect (
335
355
url : string ,
336
356
topic : string ,
357
+ namespace : string ,
337
358
handleEvents : ( events : EventBatch ) => void ,
338
359
handleReceipt : ( receipt : EventStreamReply ) => void ,
339
360
) {
361
+ const name = eventStreamName ( topic , namespace ) ;
362
+ await this . createOrUpdateStream ( newContext ( ) , name , topic ) ;
363
+
340
364
return new EventStreamSocket (
341
365
url ,
342
366
topic ,
367
+ namespace ,
343
368
this . username ,
344
369
this . password ,
345
370
handleEvents ,
0 commit comments