@@ -367,63 +367,149 @@ describe('hyperlane warp rebalancer e2e tests', async function () {
367367 } ) ( ) ;
368368
369369 return new Promise ( ( resolve , reject ) => {
370+ let settled = false ;
371+ const observedLines : string [ ] = [ ] ;
372+ let stdoutCarry = '' ;
373+ let stderrCarry = '' ;
374+
375+ const settleSuccess = ( ) => {
376+ if ( settled ) return ;
377+ settled = true ;
378+ clearTimeout ( timeoutId ) ;
379+ resolve ( void 0 ) ;
380+ } ;
381+
382+ const settleError = ( error : Error ) => {
383+ if ( settled ) return ;
384+ settled = true ;
385+ clearTimeout ( timeoutId ) ;
386+ reject ( error ) ;
387+ } ;
388+
389+ const consumeLines = ( lines : string [ ] ) => {
390+ for ( const line of lines ) {
391+ observedLines . push ( line ) ;
392+ if ( ! expectedLogs . length ) break ;
393+ try {
394+ const logJson = JSON . parse ( line ) as { msg ?: unknown } ;
395+ if (
396+ typeof logJson . msg === 'string' &&
397+ logJson . msg . includes ( expectedLogs [ 0 ] )
398+ ) {
399+ expectedLogs . shift ( ) ;
400+ }
401+ } catch ( _e ) {
402+ if ( line . includes ( expectedLogs [ 0 ] ) ) {
403+ expectedLogs . shift ( ) ;
404+ }
405+ }
406+ }
407+
408+ if ( ! expectedLogs . length ) {
409+ settleSuccess ( ) ;
410+ }
411+ } ;
412+
413+ const consumeChunk = (
414+ output : string ,
415+ stream : 'stdout' | 'stderr' ,
416+ flush = false ,
417+ ) => {
418+ const buffered =
419+ ( stream === 'stdout' ? stdoutCarry : stderrCarry ) + output ;
420+ const parts = buffered . split ( '\n' ) ;
421+ const carry = flush ? '' : ( parts . pop ( ) ?? '' ) ;
422+
423+ if ( stream === 'stdout' ) {
424+ stdoutCarry = carry ;
425+ } else {
426+ stderrCarry = carry ;
427+ }
428+
429+ consumeLines ( parts . filter ( Boolean ) ) ;
430+ } ;
431+
432+ const getErrorLines = ( error : unknown ) : string [ ] => {
433+ if ( typeof error !== 'object' || error === null ) return [ ] ;
434+ const maybeError = error as { lines ?: unknown } ;
435+ if ( typeof maybeError . lines !== 'function' ) return [ ] ;
436+
437+ const lines = maybeError . lines ( ) ;
438+ return Array . isArray ( lines )
439+ ? lines . filter ( ( line ) : line is string => typeof line === 'string' )
440+ : [ ] ;
441+ } ;
442+
443+ const getProcessErrorOutput = ( error : unknown ) : string => {
444+ const outputs : string [ ] = [ ] ;
445+
446+ outputs . push ( getErrorLines ( error ) . join ( '\n' ) ) ;
447+
448+ if ( typeof error === 'object' && error !== null ) {
449+ const maybeError = error as {
450+ stdout ?: unknown ;
451+ stderr ?: unknown ;
452+ message ?: unknown ;
453+ stack ?: unknown ;
454+ } ;
455+
456+ if ( typeof maybeError . stdout === 'string' ) {
457+ outputs . push ( maybeError . stdout ) ;
458+ }
459+ if ( typeof maybeError . stderr === 'string' ) {
460+ outputs . push ( maybeError . stderr ) ;
461+ }
462+ if ( typeof maybeError . message === 'string' ) {
463+ outputs . push ( maybeError . message ) ;
464+ }
465+ if ( typeof maybeError . stack === 'string' ) {
466+ outputs . push ( maybeError . stack ) ;
467+ }
468+ } else if ( typeof error === 'string' ) {
469+ outputs . push ( error ) ;
470+ }
471+
472+ return outputs . filter ( Boolean ) . join ( '\n' ) ;
473+ } ;
474+
370475 // Use a timeout to prevent waiting for a log that might never happen and fail faster
371476 timeoutId = setTimeout ( ( ) => {
372- reject ( new Error ( `Timeout waiting for log: "${ expectedLogs [ 0 ] } "` ) ) ;
477+ settleError ( new Error ( `Timeout waiting for log: "${ expectedLogs [ 0 ] } "` ) ) ;
373478 } , timeout ) ;
374479
375480 // Handle when the process exits due to an error that is not the expected log
376- rebalancer . catch ( ( e ) => {
377- const lines = typeof e . lines === 'function' ? e . lines ( ) : [ ] ;
378- const combined = Array . isArray ( lines ) ? lines . join ( '\n' ) : String ( e ) ;
379-
380- // Consume any expected logs that appear in the error output
381- while ( expectedLogs . length && combined . includes ( expectedLogs [ 0 ] ) ) {
382- expectedLogs . shift ( ) ;
383- }
481+ rebalancer . catch ( ( e : unknown ) => {
482+ consumeChunk ( '' , 'stdout' , true ) ;
483+ consumeChunk ( '' , 'stderr' , true ) ;
484+ const combined = getProcessErrorOutput ( e ) ;
485+ consumeLines ( combined . split ( '\n' ) . filter ( Boolean ) ) ;
384486
385- clearTimeout ( timeoutId ) ;
386487 if ( ! expectedLogs . length ) {
387- resolve ( void 0 ) ;
488+ settleSuccess ( ) ;
388489 } else {
389- const lastLine =
390- Array . isArray ( lines ) && lines . length
391- ? lines [ lines . length - 1 ]
392- : String ( e ) ;
393- reject (
490+ const lines = getErrorLines ( e ) ;
491+ const lastLine = lines . length ? lines [ lines . length - 1 ] : String ( e ) ;
492+ settleError (
394493 new Error (
395494 `Process failed before logging: "${ expectedLogs [ 0 ] } " with error: ${ lastLine } ` ,
396495 ) ,
397496 ) ;
398497 }
399498 } ) ;
400499 ( async ( ) => {
401- // Wait for the process to output the expected log.
402500 for await ( let chunk of rebalancer . stdout ) {
403501 chunk = typeof chunk === 'string' ? chunk : chunk . toString ( ) ;
404- const lines = chunk . split ( '\n' ) . filter ( Boolean ) ; // handle empty lines
405-
406- for ( const line of lines ) {
407- if ( ! expectedLogs . length ) break ;
408- try {
409- const logJson = JSON . parse ( line ) ;
410- if ( logJson . msg ?. includes ( expectedLogs [ 0 ] ) ) {
411- expectedLogs . shift ( ) ;
412- }
413- } catch ( _e ) {
414- // For non-json logs
415- if ( line . includes ( expectedLogs [ 0 ] ) ) {
416- expectedLogs . shift ( ) ;
417- }
418- }
419- }
420-
421- if ( ! expectedLogs . length ) {
422- resolve ( void 0 ) ;
423- break ;
424- }
502+ consumeChunk ( chunk , 'stdout' ) ;
503+ }
504+ consumeChunk ( '' , 'stdout' , true ) ;
505+ } ) ( ) . catch ( settleError ) ;
506+ ( async ( ) => {
507+ for await ( let chunk of rebalancer . stderr ) {
508+ chunk = typeof chunk === 'string' ? chunk : chunk . toString ( ) ;
509+ consumeChunk ( chunk , 'stderr' ) ;
425510 }
426- } ) ( ) . catch ( reject ) ;
511+ consumeChunk ( '' , 'stderr' , true ) ;
512+ } ) ( ) . catch ( settleError ) ;
427513 } ) . finally ( async ( ) => {
428514 // Perform a cleanup at the end
429515 clearTimeout ( timeoutId ) ;
0 commit comments