@@ -10,10 +10,23 @@ const base64 = (data: string): string =>
1010 // eslint-disable-next-line no-restricted-properties
1111 Buffer . from ( data ) . toString ( 'base64' ) . replace ( / \+ / g, '-' ) . replace ( / \/ / g, '_' ) . split ( '=' , 1 ) [ 0 ] ;
1212
13+ /**
14+ * Convert reserved WebSocket close codes to valid codes that can be sent in a close frame.
15+ *
16+ * Per RFC 6455, certain codes cannot be set by applications:
17+ * - 1004: Reserved
18+ * - 1005: No Status Received (MUST NOT be set in close frame)
19+ * - 1006: Abnormal Closure (MUST NOT be set in close frame, indicates no close frame received)
20+ *
21+ * The `ws` module enforces this and throws if you try to close with these codes.
22+ *
23+ * We convert these to 1011 (Internal Error) which is semantically appropriate:
24+ * "The server is terminating the connection because it encountered an unexpected condition"
25+ */
1326const liftErrorCode = ( code : number ) => {
1427 if ( code === 1004 || code === 1005 || code === 1006 ) {
15- // ws module forbid those error codes usage, lift to "application level" (4xxx)
16- return 3000 + code ;
28+ // Use 1011 (Internal Error) as a valid substitute for reserved codes
29+ return 1011 ;
1730 }
1831 return code ;
1932} ;
@@ -43,8 +56,17 @@ type ConnectionMetrics = {
4356 lastResourceVersion ?: string ;
4457} ;
4558
59+ // Connection tracking includes metrics plus socket references for proper cleanup
60+ type TrackedConnection = {
61+ metrics : ConnectionMetrics ;
62+ source : WebSocket ;
63+ target : WebSocket ;
64+ kubeUri : string ;
65+ heartbeatInterval : NodeJS . Timeout | null ;
66+ } ;
67+
4668// Global connection tracking for monitoring and cleanup
47- const activeConnections = new Map < string , ConnectionMetrics > ( ) ;
69+ const activeConnections = new Map < string , TrackedConnection > ( ) ;
4870
4971// Constants for connection management (exported for testing)
5072export const CONNECTION_TIMEOUT_MS = 10000 ; // 10 seconds to establish connection
@@ -56,7 +78,8 @@ const cleanupStaleConnections = (fastify: KubeFastifyInstance) => {
5678 const now = Date . now ( ) ;
5779 let cleanedCount = 0 ;
5880
59- for ( const [ connectionId , metrics ] of activeConnections . entries ( ) ) {
81+ for ( const [ connectionId , tracked ] of activeConnections . entries ( ) ) {
82+ const { metrics, source, target, kubeUri, heartbeatInterval } = tracked ;
6083 const inactivityDuration = now - Math . max ( metrics . lastMessageReceived , metrics . lastMessageSent ) ;
6184
6285 if ( inactivityDuration > STALE_CONNECTION_MS ) {
@@ -66,9 +89,21 @@ const cleanupStaleConnections = (fastify: KubeFastifyInstance) => {
6689 inactivityDuration,
6790 messagesReceived : metrics . messagesReceived ,
6891 messagesSent : metrics . messagesSent ,
92+ duration : now - metrics . created ,
6993 } ,
70- `Removing stale connection from tracking : ${ connectionId } ` ,
94+ `Closing stale websocket connection : ${ kubeUri } ` ,
7195 ) ;
96+
97+ // Clear heartbeat interval if it exists
98+ if ( heartbeatInterval ) {
99+ clearInterval ( heartbeatInterval ) ;
100+ }
101+
102+ // Close both websockets so the client receives the close event and can reconnect
103+ // Use code 1001 (going away) to indicate the server is closing due to inactivity
104+ closeWebSocket ( source , 1001 , 'Connection stale due to inactivity' ) ;
105+ closeWebSocket ( target , 1001 , 'Connection stale due to inactivity' ) ;
106+
72107 activeConnections . delete ( connectionId ) ;
73108 cleanedCount ++ ;
74109 }
@@ -121,7 +156,6 @@ export default async (fastify: KubeFastifyInstance): Promise<void> => {
121156 messagesReceived : 0 ,
122157 messagesSent : 0 ,
123158 } ;
124- activeConnections . set ( connectionId , metrics ) ;
125159
126160 fastify . log . info (
127161 {
@@ -147,6 +181,16 @@ export default async (fastify: KubeFastifyInstance): Promise<void> => {
147181 ca : https . globalAgent . options . ca as WebSocket . CertMeta ,
148182 } ) ;
149183
184+ // Track connection with socket references for proper cleanup
185+ const tracked : TrackedConnection = {
186+ metrics,
187+ source,
188+ target,
189+ kubeUri,
190+ heartbeatInterval : null ,
191+ } ;
192+ activeConnections . set ( connectionId , tracked ) ;
193+
150194 // Close both connections and log diagnostics
151195 const close = ( code : number , reason : string | Buffer ) => {
152196 // Make idempotent - only run once per connection
@@ -169,6 +213,12 @@ export default async (fastify: KubeFastifyInstance): Promise<void> => {
169213 `Closing websocket connection: ${ kubeUri } ` ,
170214 ) ;
171215
216+ // Clear heartbeat interval if it exists
217+ if ( tracked . heartbeatInterval ) {
218+ clearInterval ( tracked . heartbeatInterval ) ;
219+ tracked . heartbeatInterval = null ;
220+ }
221+
172222 closeWebSocket ( source , code , reason ) ;
173223 closeWebSocket ( target , code , reason ) ;
174224 activeConnections . delete ( connectionId ) ;
@@ -189,9 +239,6 @@ export default async (fastify: KubeFastifyInstance): Promise<void> => {
189239 }
190240 } , CONNECTION_TIMEOUT_MS ) ;
191241
192- // Heartbeat monitoring
193- let heartbeatInterval : NodeJS . Timeout | null = null ;
194-
195242 const onUnexpectedResponse = ( _ : ClientRequest , response : IncomingMessage ) => {
196243 const statusCode = response . statusCode || 'unknown' ;
197244 const statusMessage = response . statusMessage || 'unknown' ;
@@ -219,7 +266,7 @@ export default async (fastify: KubeFastifyInstance): Promise<void> => {
219266 ) ;
220267
221268 // Start heartbeat monitoring
222- heartbeatInterval = setInterval ( ( ) => {
269+ tracked . heartbeatInterval = setInterval ( ( ) => {
223270 // Send ping to keep connection alive
224271 if ( target . readyState === WebSocket . OPEN ) {
225272 try {
@@ -311,8 +358,9 @@ export default async (fastify: KubeFastifyInstance): Promise<void> => {
311358
312359 // Handle K8s API connection close
313360 target . on ( 'close' , ( code , reason ) => {
314- if ( heartbeatInterval ) {
315- clearInterval ( heartbeatInterval ) ;
361+ if ( tracked . heartbeatInterval ) {
362+ clearInterval ( tracked . heartbeatInterval ) ;
363+ tracked . heartbeatInterval = null ;
316364 }
317365
318366 const reasonStr = String ( reason ) ;
@@ -378,8 +426,9 @@ export default async (fastify: KubeFastifyInstance): Promise<void> => {
378426
379427 // Handle client connection close
380428 source . on ( 'close' , ( code , reason ) => {
381- if ( heartbeatInterval ) {
382- clearInterval ( heartbeatInterval ) ;
429+ if ( tracked . heartbeatInterval ) {
430+ clearInterval ( tracked . heartbeatInterval ) ;
431+ tracked . heartbeatInterval = null ;
383432 }
384433 clearTimeout ( connectionTimeout ) ;
385434
0 commit comments