@@ -12,6 +12,10 @@ import Logging from '../utils/logging';
1212
1313const console = new Console ( Logging . k8s . stream ) ;
1414
15+ function defined < T > ( input : T | undefined | null ) : input is T {
16+ return typeof input !== 'undefined' && input !== null ;
17+ }
18+
1519/**
1620 * ErrorSuppressingStdin wraps a socket such that when the 'data' event handler
1721 * throws, we can suppress the output so we do not get a dialog box, but rather
@@ -71,7 +75,7 @@ class ForwardingMap {
7175 * @param endpoint The endpoint in the namespace to forward to.
7276 * @param port The port to forward to on the endpoint.
7377 */
74- get ( namespace : string | undefined , endpoint : string , port : number ) {
78+ get ( namespace : string | undefined , endpoint : string , port : number | string ) {
7579 return this . map . get ( `${ namespace || 'default' } /${ endpoint } :${ port } ` ) ;
7680 }
7781
@@ -82,7 +86,7 @@ class ForwardingMap {
8286 * @param port The port to forward to on the endpoint.
8387 * @param server The value to set.
8488 */
85- set ( namespace : string | undefined , endpoint : string , port : number , server : net . Server ) {
89+ set ( namespace : string | undefined , endpoint : string , port : number | string , server : net . Server ) {
8690 return this . map . set ( `${ namespace || 'default' } /${ endpoint } :${ port } ` , server ) ;
8791 }
8892
@@ -92,7 +96,7 @@ class ForwardingMap {
9296 * @param endpoint The endpoint in the namespace to forward to.
9397 * @param port The port to forward to on the endpoint.
9498 */
95- delete ( namespace : string | undefined , endpoint : string , port : number ) {
99+ delete ( namespace : string | undefined , endpoint : string , port : number | string ) {
96100 return this . map . delete ( `${ namespace || 'default' } /${ endpoint } :${ port } ` ) ;
97101 }
98102
@@ -102,23 +106,24 @@ class ForwardingMap {
102106 * @param endpoint The endpoint in the namespace to forward to.
103107 * @param port The port to forward to on the endpoint.
104108 */
105- has ( namespace : string | undefined , endpoint : string , port : number ) {
109+ has ( namespace : string | undefined , endpoint : string , port : number | string ) {
106110 return this . map . has ( `${ namespace || 'default' } /${ endpoint } :${ port } ` ) ;
107111 }
108112
109113 /**
110114 * Iterate through the entries.
111115 */
112- * [ Symbol . iterator ] ( ) : IterableIterator < [ string , string , number , net . Server ] > {
116+ * [ Symbol . iterator ] ( ) : IterableIterator < [ string , string , number | string , net . Server ] > {
113117 const iter = this . map [ Symbol . iterator ] ( ) ;
114118
115119 for ( const [ key , server ] of iter ) {
116- const match = / ^ ( [ ^ / ] * ) \/ ( [ ^ : ] + ) : ( \d + ) $ / . exec ( key ) ;
120+ const match = / ^ ( [ ^ / ] * ) \/ ( [ ^ : ] + ) : ( . + ? ) $ / . exec ( key ) ;
117121
118122 if ( match ) {
119- const [ namespace , endpoint , port ] = match ;
123+ const [ namespace , endpoint , portString ] = match ;
124+ const port = / ^ \d + $ / . test ( portString ) ? parseInt ( portString ) : portString ;
120125
121- yield [ namespace , endpoint , parseInt ( port ) , server ] ;
126+ yield [ namespace , endpoint , port , server ] ;
122127 }
123128 }
124129 }
@@ -160,8 +165,8 @@ export type ServiceEntry = {
160165 name : string ;
161166 /** The name of the port within the service. */
162167 portName ?: string ;
163- /** The internal port number of the service. */
164- port ?:number ;
168+ /** The internal port number (or name) of the service. */
169+ port ?: number | string ;
165170 /** The forwarded port on localhost (on the host), if any. */
166171 listenPort ?:number ;
167172}
@@ -263,43 +268,72 @@ export class KubeClient extends events.EventEmitter {
263268 this . removeAllListeners ( 'service-changed' ) ;
264269 }
265270
266- /**
267- * Return a pod that is part of a given endpoint and ready to receive traffic.
268- * @param {string } namespace The namespace in which to look for resources.
269- * @param {string } endpointName the name of an endpoint that controls ready pods.
270- * @returns {Promise<k8s.V1Pod?> }
271- */
272- async getActivePod ( namespace : string , endpointName : string ) : Promise < k8s . V1Pod | null > {
273- console . log ( `Attempting to locate ${ endpointName } pod...` ) ;
274- // Loop fetching endpoints, until it matches at least one pod.
275- let target : k8s . V1ObjectReference | undefined ;
271+ protected async getEndpointSubsets ( namespace : string , endpointName : string ) : Promise < k8s . V1EndpointSubset [ ] | null > {
272+ console . log ( `Attempting to locate endpoint subsets ${ endpointName } ...` ) ;
273+ // Loop fetching endpoints, until it matches at least one subset.
274+ let target : k8s . V1EndpointSubset [ ] | undefined ;
276275
277276 // TODO: switch this to using watch.
278277 while ( ! this . shutdown ) {
279278 const endpoints = await this . coreV1API . listNamespacedEndpoints (
280279 namespace , undefined , undefined , undefined , undefined , undefined , undefined , undefined ,
281280 undefined , undefined , undefined , { headers : { name : endpointName } } ) ;
282281
283- target = endpoints ?. body ?. items
284- ?. flatMap ( item => item . subsets ) . filter ( x => x )
285- . flatMap ( subset => subset ?. addresses ) . filter ( x => x )
286- . flatMap ( address => address ?. targetRef )
287- . find ( ref => ref ) ;
288- if ( target || this . shutdown ) {
282+ const body = endpoints ?. body ;
283+ const items = ( body ?. items || [ ] ) . filter ( item => item . metadata ?. name === endpointName ) ;
284+
285+ target = items . flatMap ( item => item . subsets ) . filter ( defined ) ;
286+ if ( target . length > 0 || this . shutdown ) {
289287 break ;
290288 }
291- console . log ( `Could not find ${ endpointName } pod (${ endpoints ? 'did' : 'did not' } get endpoints), retrying...` ) ;
289+ console . log ( `Could not find ${ endpointName } endpoint (${ body ? 'did' : 'did not' } get endpoints), retrying...` ) ;
292290 await util . promisify ( setTimeout ) ( 1000 ) ;
293291 }
292+
293+ return target ?? null ;
294+ }
295+
296+ protected async getActivePodFromEndpointSubsets ( subsets : k8s . V1EndpointSubset [ ] ) {
297+ const addresses = subsets . flatMap ( subset => subset . addresses ) . filter ( defined ) ;
298+ const address = addresses . find ( address => address . targetRef ?. kind === 'Pod' ) ;
299+ const target = address ?. targetRef ;
300+
294301 if ( ! target || ! target . name || ! target . namespace ) {
295302 return null ;
296303 }
304+
297305 // Fetch the pod
298- const { body : pod } = await this . coreV1API . readNamespacedPod ( target . name , target . namespace ) ;
306+ const resp = await this . coreV1API . readNamespacedPod ( target . name , target . namespace ) ;
299307
300- console . log ( `Got ${ endpointName } pod: ${ pod ?. metadata ?. namespace } :${ pod ?. metadata ?. name } ` ) ;
308+ return resp ?. body ;
309+ }
301310
302- return pod ;
311+ /**
312+ * Return a pod that is part of a given endpoint and ready to receive traffic.
313+ * @param namespace The namespace in which to look for resources.
314+ * @param endpointName the name of an endpoint that controls ready pods.
315+ */
316+ async getActivePod ( namespace : string , endpointName : string ) : Promise < k8s . V1Pod | null > {
317+ console . log ( `Attempting to locate ${ endpointName } pod...` ) ;
318+ while ( ! this . shutdown ) {
319+ const subsets = await this . getEndpointSubsets ( namespace , endpointName ) ;
320+
321+ if ( ! subsets ) {
322+ await util . promisify ( setTimeout ) ( 1000 ) ;
323+ continue ;
324+ }
325+ const pod = await this . getActivePodFromEndpointSubsets ( subsets ) ;
326+
327+ if ( ! pod ) {
328+ await util . promisify ( setTimeout ) ( 1000 ) ;
329+ continue ;
330+ }
331+ console . log ( `Got ${ endpointName } pod: ${ pod . metadata ?. namespace } :${ pod . metadata ?. name } ` ) ;
332+
333+ return pod ;
334+ }
335+
336+ return null ;
303337 }
304338
305339 async isServiceReady ( namespace : string , service : string ) : Promise < boolean > {
@@ -316,7 +350,7 @@ export class KubeClient extends events.EventEmitter {
316350 * @param endpoint The endpoint in the namespace to forward to.
317351 * @param port The port to forward to on the endpoint.
318352 */
319- protected async createForwardingServer ( namespace : string , endpoint : string , port : number ) : Promise < void > {
353+ protected async createForwardingServer ( namespace : string , endpoint : string , port : number | string ) : Promise < void > {
320354 const targetName = `${ namespace } /${ endpoint } :${ port } ` ;
321355
322356 if ( this . servers . get ( namespace , endpoint , port ) ) {
@@ -343,9 +377,15 @@ export class KubeClient extends events.EventEmitter {
343377 }
344378 } ) ;
345379 // Find a working pod
346- const pod = await this . getActivePod ( namespace , endpoint ) ;
380+ const endpoints = await this . getEndpointSubsets ( namespace , endpoint ) ?? [ ] ;
381+ const pod = await this . getActivePodFromEndpointSubsets ( endpoints ) ;
382+
383+ if ( ! pod ) {
384+ socket . destroy ( new Error ( `Port forwarding to ${ targetName } failed; no active pod found` ) ) ;
347385
348- if ( ! pod || ! this . servers . has ( namespace , endpoint , port ) ) {
386+ return ;
387+ }
388+ if ( ! this . servers . has ( namespace , endpoint , port ) ) {
349389 socket . destroy ( new Error ( `Port forwarding to ${ targetName } was cancelled` ) ) ;
350390
351391 return ;
@@ -358,8 +398,20 @@ export class KubeClient extends events.EventEmitter {
358398 }
359399 const { metadata :{ namespace : podNamespace , name : podName } } = pod ;
360400 const stdin = new ErrorSuppressingStdin ( socket ) ;
401+ let portNumber : number ;
402+
403+ if ( typeof port === 'number' ) {
404+ portNumber = port ;
405+ } else {
406+ const ports = endpoints . flatMap ( endpoint => endpoint . ports ) . filter ( defined ) ;
361407
362- this . forwarder . portForward ( podNamespace || 'default' , podName , [ port ] , socket , null , stdin )
408+ portNumber = ports . find ( p => p . name === port ) ?. port ?? 0 ;
409+ if ( portNumber === 0 ) {
410+ throw new Error ( `Could not find port number for ${ targetName } ` ) ;
411+ }
412+ }
413+
414+ this . forwarder . portForward ( podNamespace || 'default' , podName , [ portNumber ] , socket , null , stdin )
363415 . catch ( ( e ) => {
364416 console . log ( `Failed to create web socket for forwarding to ${ targetName } : ${ e ?. error || e } ` ) ;
365417 socket . destroy ( e ) ;
@@ -405,7 +457,7 @@ export class KubeClient extends events.EventEmitter {
405457 * @param port The port to forward.
406458 * @return The port number for the port forward.
407459 */
408- async forwardPort ( namespace : string , endpoint : string , port : number ) : Promise < number | undefined > {
460+ async forwardPort ( namespace : string , endpoint : string , port : number | string ) : Promise < number | undefined > {
409461 const targetName = `${ namespace } /${ endpoint } :${ port } ` ;
410462
411463 await this . createForwardingServer ( namespace , endpoint , port ) ;
@@ -425,11 +477,11 @@ export class KubeClient extends events.EventEmitter {
425477
426478 /**
427479 * Ensure that a given port forwarding does not exist; if it did, close it.
428- * @param { string } namespace The namespace to forward to.
429- * @param { string } endpoint The endpoint in the namespace to forward to.
430- * @param { number } port The port to forward to on the endpoint.
480+ * @param namespace The namespace to forward to.
481+ * @param endpoint The endpoint in the namespace to forward to.
482+ * @param port The port to forward to on the endpoint.
431483 */
432- async cancelForwardPort ( namespace : string , endpoint : string , port : number ) {
484+ async cancelForwardPort ( namespace : string , endpoint : string , port : number | string ) {
433485 const server = this . servers . get ( namespace , endpoint , port ) ;
434486
435487 this . servers . delete ( namespace , endpoint , port ) ;
0 commit comments