@@ -39,6 +39,8 @@ import { setGlobalDispatcher, Agent } from 'undici';
3939import { PythPriceFeedSubscriber } from '../pythPriceFeedSubscriber' ;
4040import { SwiftMaker } from './swift/makerExample' ;
4141import { SwiftTaker } from './swift/takerExample' ;
42+ import * as net from 'net' ;
43+ import { PythLazerClient } from '@pythnetwork/pyth-lazer-sdk' ;
4244
4345setGlobalDispatcher (
4446 new Agent ( {
@@ -307,6 +309,19 @@ const runBot = async () => {
307309 throw new Error ( 'fillerMultithreaded bot config not found' ) ;
308310 }
309311
312+ let pythLazerClient : PythLazerClient | undefined ;
313+ if ( config . global . driftEnv ! === 'devnet' ) {
314+ if ( ! config . global . lazerEndpoint || ! config . global . lazerToken ) {
315+ throw new Error (
316+ 'Must set environment variables LAZER_ENDPOINT and LAZER_TOKEN'
317+ ) ;
318+ }
319+ pythLazerClient = new PythLazerClient (
320+ config . global . lazerEndpoint ,
321+ config . global . lazerToken
322+ ) ;
323+ }
324+
310325 // Ensure that there are no duplicate market indexes in the Array<number[]> marketIndexes config
311326 const marketIndexes = new Set < number > ( ) ;
312327 for ( const marketIndexList of config . botConfigs . fillerMultithreaded
@@ -335,6 +350,7 @@ const runBot = async () => {
335350 } ,
336351 bundleSender ,
337352 pythPriceSubscriber ,
353+ pythLazerClient ,
338354 [ ]
339355 ) ;
340356 bots . push ( fillerMultithreaded ) ;
@@ -423,46 +439,50 @@ const runBot = async () => {
423439
424440 // start http server listening to /health endpoint using http package
425441 const startupTime = Date . now ( ) ;
426- http
427- . createServer ( async ( req , res ) => {
428- if ( req . url === '/health' ) {
429- if ( config . global . testLiveness ) {
430- if ( Date . now ( ) > startupTime + 60 * 1000 ) {
431- res . writeHead ( 500 ) ;
432- res . end ( 'Testing liveness test fail' ) ;
433- return ;
434- }
435- }
436-
437- /* @ts -ignore */
438- if ( ! driftClient . connection . _rpcWebSocketConnected ) {
439- logger . error ( `Connection rpc websocket disconnected` ) ;
442+ const createServerCallback = async ( req : any , res : any ) => {
443+ if ( req . url === '/health' ) {
444+ if ( config . global . testLiveness ) {
445+ if ( Date . now ( ) > startupTime + 60 * 1000 ) {
440446 res . writeHead ( 500 ) ;
441- res . end ( `Connection rpc websocket disconnected` ) ;
447+ res . end ( 'Testing liveness test fail' ) ;
442448 return ;
443449 }
450+ }
444451
445- // check all bots if they're live
446- for ( const bot of bots ) {
447- const healthCheck = await promiseTimeout ( bot . healthCheck ( ) , 1000 ) ;
448- if ( ! healthCheck ) {
449- logger . error ( `Health check failed for bot` ) ;
450- res . writeHead ( 503 ) ;
451- res . end ( `Bot is not healthy` ) ;
452- return ;
453- }
454- }
452+ /* @ts -ignore */
453+ if ( ! driftClient . connection . _rpcWebSocketConnected ) {
454+ logger . error ( `Connection rpc websocket disconnected` ) ;
455+ res . writeHead ( 500 ) ;
456+ res . end ( `Connection rpc websocket disconnected` ) ;
457+ return ;
458+ }
455459
456- // liveness check passed
457- res . writeHead ( 200 ) ;
458- res . end ( 'OK' ) ;
459- } else {
460- res . writeHead ( 404 ) ;
461- res . end ( 'Not found' ) ;
460+ // check all bots if they're live
461+ for ( const bot of bots ) {
462+ const healthCheck = await promiseTimeout ( bot . healthCheck ( ) , 1000 ) ;
463+ if ( ! healthCheck ) {
464+ logger . error ( `Health check failed for bot` ) ;
465+ res . writeHead ( 503 ) ;
466+ res . end ( `Bot is not healthy` ) ;
467+ return ;
468+ }
462469 }
463- } )
464- . listen ( healthCheckPort ) ;
465- logger . info ( `Health check server listening on port ${ healthCheckPort } ` ) ;
470+
471+ // liveness check passed
472+ res . writeHead ( 200 ) ;
473+ res . end ( 'OK' ) ;
474+ } else {
475+ res . writeHead ( 404 ) ;
476+ res . end ( 'Not found' ) ;
477+ }
478+ } ;
479+
480+ let healthCheckPortToUse = Number ( healthCheckPort ) ;
481+ while ( await isPortInUse ( healthCheckPortToUse ) ) {
482+ healthCheckPortToUse ++ ;
483+ }
484+ http . createServer ( createServerCallback ) . listen ( healthCheckPortToUse ) ;
485+ logger . info ( `Server listening on port ${ healthCheckPortToUse } ` ) ;
466486} ;
467487
468488recursiveTryCatch ( ( ) => runBot ( ) ) ;
@@ -476,3 +496,26 @@ async function recursiveTryCatch(f: () => void) {
476496 await recursiveTryCatch ( f ) ;
477497 }
478498}
499+
500+ function isPortInUse ( port : number , host = '127.0.0.1' ) : Promise < boolean > {
501+ return new Promise ( ( resolve ) => {
502+ const server = net . createServer ( ) ;
503+
504+ server . once ( 'error' , ( err ) => {
505+ if (
506+ err . name ?. includes ( 'EADDRINUSE' ) ||
507+ err . message ?. includes ( 'EADDRINUSE' )
508+ ) {
509+ resolve ( true ) ;
510+ } else {
511+ resolve ( false ) ;
512+ }
513+ } ) ;
514+
515+ server . once ( 'listening' , ( ) => {
516+ server . close ( ( ) => resolve ( false ) ) ;
517+ } ) ;
518+
519+ server . listen ( port , host ) ;
520+ } ) ;
521+ }
0 commit comments