@@ -32,6 +32,7 @@ import {
32
32
WebSocketMessageBatchData ,
33
33
WebSocketMessageWithId ,
34
34
} from './eventstream-proxy.interfaces' ;
35
+ import { LoggingAndMetricsInterceptor } from '../logging-and-metrics.interceptor' ;
35
36
36
37
/**
37
38
* Base class for a websocket gateway that listens for and proxies event stream messages.
@@ -50,13 +51,15 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {
50
51
private currentClient : WebSocketEx | undefined ;
51
52
private subscriptionNames = new Map < string , string > ( ) ;
52
53
private queue = Promise . resolve ( ) ;
54
+ private mostRecentBatchTimestamp = new Date ( ) ;
53
55
54
56
constructor (
55
57
protected readonly logger : Logger ,
56
58
protected eventstream : EventStreamService ,
57
59
requireAuth = false ,
60
+ protected metrics : LoggingAndMetricsInterceptor ,
58
61
) {
59
- super ( logger , requireAuth ) ;
62
+ super ( logger , requireAuth , metrics ) ;
60
63
}
61
64
62
65
configure ( url ?: string , topic ?: string ) {
@@ -126,6 +129,21 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {
126
129
}
127
130
128
131
private async processEvents ( batch : EventBatch ) {
132
+ this . logger . log ( 'Recording batch size metric of ' + batch . events . length ) ;
133
+
134
+ // Record metrics
135
+ this . metrics . setEventBatchSize ( batch . events . length ) ;
136
+ let timestamp = new Date ( ) ;
137
+ this . logger . log (
138
+ 'Recording batch interval of ' +
139
+ ( timestamp . getTime ( ) - this . mostRecentBatchTimestamp . getTime ( ) ) +
140
+ ' milliseconds' ,
141
+ ) ;
142
+ this . metrics . observeBatchInterval (
143
+ timestamp . getTime ( ) - this . mostRecentBatchTimestamp . getTime ( ) ,
144
+ ) ;
145
+ this . mostRecentBatchTimestamp = timestamp ;
146
+
129
147
const messages : WebSocketMessage [ ] = [ ] ;
130
148
for ( const event of batch . events ) {
131
149
this . logger . log ( `Proxying event: ${ JSON . stringify ( event ) } ` ) ;
0 commit comments