@@ -61,6 +61,15 @@ export class HyperLiquidSubscriptionService {
6161 private blocklistMarkets : string [ ] ; // Market filtering (blocklist)
6262 private discoveredDexNames : string [ ] = [ ] ; // DEX order for mapping webData3 perpDexStates indices
6363
64+ // DEX discovery synchronization - allows subscriptions to wait for HIP-3 DEX discovery
65+ private dexDiscoveryPromise : Promise < void > | null = null ;
66+ private dexDiscoveryResolver : ( ( ) => void ) | null = null ;
67+
68+ // Track DEXs for synchronized position notifications
69+ // Ensures all DEXs send initial data before notifying subscribers
70+ private expectedDexs : Set < string > = new Set ( ) ;
71+ private initializedDexs : Set < string > = new Set ( ) ;
72+
6473 // Subscriber collections
6574 private readonly priceSubscribers = new Map <
6675 string ,
@@ -244,6 +253,99 @@ export class HyperLiquidSubscriptionService {
244253 return this . enabledDexs . includes ( dex ) ;
245254 }
246255
256+ /**
257+ * Populate DEX meta cache with pre-fetched meta data
258+ * Called by Provider after buildAssetMapping to share cached meta,
259+ * avoiding redundant metaAndAssetCtxs/meta API calls during subscription setup
260+ * @param dex - DEX key ('' for main DEX, 'xyz'/'flx'/etc for HIP-3)
261+ * @param meta - Meta response containing universe data
262+ */
263+ public setDexMetaCache (
264+ dex : string ,
265+ meta : {
266+ universe : {
267+ name : string ;
268+ szDecimals : number ;
269+ maxLeverage : number ;
270+ } [ ] ;
271+ } ,
272+ ) : void {
273+ this . dexMetaCache . set ( dex , meta ) ;
274+ DevLogger . log ( '[SubscriptionService] DEX meta cache populated' , {
275+ dex : dex || 'main' ,
276+ universeSize : meta . universe . length ,
277+ } ) ;
278+ }
279+
280+ /**
281+ * Cache asset contexts for a specific DEX from API response
282+ * This allows buildAssetMapping() to populate cache for getMarketDataWithPrices() to use
283+ * @param dex - DEX name ('' for main perps)
284+ * @param assetCtxs - Asset contexts from metaAndAssetCtxs response
285+ */
286+ public setDexAssetCtxsCache (
287+ dex : string ,
288+ assetCtxs : WsAssetCtxsEvent [ 'ctxs' ] ,
289+ ) : void {
290+ this . dexAssetCtxsCache . set ( dex , assetCtxs ) ;
291+ DevLogger . log ( '[SubscriptionService] DEX assetCtxs cache populated' , {
292+ dex : dex || 'main' ,
293+ ctxsCount : assetCtxs . length ,
294+ } ) ;
295+ }
296+
297+ /**
298+ * Get cached assetCtxs for a DEX
299+ * Returns the cached asset contexts from WebSocket subscription if available
300+ * @param dex - DEX key ('' for main DEX, 'xyz'/'flx'/etc for HIP-3)
301+ * @returns Array of asset contexts or undefined if not cached
302+ */
303+ public getDexAssetCtxsCache (
304+ dex : string ,
305+ ) : WsAssetCtxsEvent [ 'ctxs' ] | undefined {
306+ return this . dexAssetCtxsCache . get ( dex ) ;
307+ }
308+
309+ /**
310+ * Wait for DEX discovery to complete (with timeout)
311+ * Used when HIP-3 is enabled but enabledDexs hasn't been populated yet.
312+ * This allows subscriptions to wait for DEX discovery before creating per-DEX subscriptions.
313+ */
314+ private async waitForDexDiscovery ( timeoutMs : number = 5000 ) : Promise < void > {
315+ // Already have DEXs, no need to wait
316+ if ( this . enabledDexs . length > 0 ) {
317+ return ;
318+ }
319+
320+ // Create promise if not exists
321+ if ( ! this . dexDiscoveryPromise ) {
322+ this . dexDiscoveryPromise = new Promise < void > ( ( resolve ) => {
323+ this . dexDiscoveryResolver = resolve ;
324+ } ) ;
325+ }
326+
327+ // Wait with timeout
328+ let timeoutId : NodeJS . Timeout | undefined ;
329+ const timeoutPromise = new Promise < void > ( ( _ , reject ) => {
330+ timeoutId = setTimeout (
331+ ( ) => reject ( new Error ( 'DEX discovery timeout' ) ) ,
332+ timeoutMs ,
333+ ) ;
334+ } ) ;
335+
336+ try {
337+ await Promise . race ( [ this . dexDiscoveryPromise , timeoutPromise ] ) ;
338+ } catch {
339+ DevLogger . log (
340+ 'DEX discovery wait timed out, proceeding with main DEX only' ,
341+ ) ;
342+ } finally {
343+ if ( timeoutId ) {
344+ clearTimeout ( timeoutId ) ;
345+ }
346+ }
347+ }
348+
247349 /**
248350 * Update feature flags for HIP-3 support
249351 * Called when provider configuration changes at runtime
@@ -266,6 +368,13 @@ export class HyperLiquidSubscriptionService {
266368 this . blocklistMarkets = blocklistMarkets ;
267369 this . discoveredDexNames = enabledDexs ; // Store DEX order for webData3 index mapping
268370
371+ // Resolve any pending DEX discovery wait now that DEXs are available
372+ if ( this . dexDiscoveryResolver && enabledDexs . length > 0 ) {
373+ this . dexDiscoveryResolver ( ) ;
374+ this . dexDiscoveryPromise = null ;
375+ this . dexDiscoveryResolver = null ;
376+ }
377+
269378 DevLogger . log ( 'Feature flags updated:' , {
270379 previousHip3Enabled,
271380 hip3Enabled,
@@ -309,8 +418,47 @@ export class HyperLiquidSubscriptionService {
309418 ) ;
310419 }
311420
312- // Note: webData3 automatically includes all DEX data, so no separate
313- // subscription setup needed for positions/orders/account data
421+ // Establish clearinghouseState/openOrders subscriptions for new DEXs
422+ // (needed for positions, orders, and account data when using individual subscriptions)
423+ const hasUserDataSubscribers =
424+ this . positionSubscriberCount > 0 ||
425+ this . orderSubscriberCount > 0 ||
426+ this . accountSubscriberCount > 0 ;
427+
428+ if ( hasUserDataSubscribers && this . hip3Enabled ) {
429+ try {
430+ const userAddress =
431+ await this . walletService . getUserAddressWithDefault ( ) ;
432+
433+ await Promise . all (
434+ newDexs . map ( async ( dex ) => {
435+ try {
436+ await this . ensureClearinghouseStateSubscription (
437+ userAddress ,
438+ dex ,
439+ ) ;
440+ await this . ensureOpenOrdersSubscription ( userAddress , dex ) ;
441+ DevLogger . log (
442+ `Established user data subscriptions for new DEX: ${ dex } ` ,
443+ ) ;
444+ } catch ( error ) {
445+ Logger . error (
446+ ensureError ( error ) ,
447+ this . getErrorContext (
448+ 'updateFeatureFlags.ensureUserDataSubscription' ,
449+ { dex } ,
450+ ) ,
451+ ) ;
452+ }
453+ } ) ,
454+ ) ;
455+ } catch ( error ) {
456+ Logger . error (
457+ ensureError ( error ) ,
458+ this . getErrorContext ( 'updateFeatureFlags.getUserAddress' ) ,
459+ ) ;
460+ }
461+ }
314462 }
315463 }
316464
@@ -854,6 +1002,18 @@ export class HyperLiquidSubscriptionService {
8541002 return ;
8551003 }
8561004
1005+ // Wait for DEX discovery if HIP-3 is enabled but DEXs haven't been discovered yet
1006+ // This ensures HIP-3 subscriptions are created together with main DEX
1007+ if ( this . hip3Enabled && this . enabledDexs . length === 0 ) {
1008+ DevLogger . log (
1009+ 'Waiting for DEX discovery before creating subscriptions...' ,
1010+ ) ;
1011+ await this . waitForDexDiscovery ( ) ;
1012+ DevLogger . log ( 'DEX discovery complete, proceeding with subscriptions' , {
1013+ enabledDexs : this . enabledDexs ,
1014+ } ) ;
1015+ }
1016+
8571017 return new Promise < void > ( ( resolve , reject ) => {
8581018 // Choose channel based on HIP-3 master switch
8591019 if ( ! this . hip3Enabled ) {
@@ -979,6 +1139,11 @@ export class HyperLiquidSubscriptionService {
9791139 ...this . enabledDexs . filter ( ( d ) => this . isDexEnabled ( d ) ) ,
9801140 ] ;
9811141
1142+ // Track expected DEXs for synchronized notifications
1143+ // Clear previous tracking and set new expected DEXs
1144+ this . expectedDexs = new Set ( dexsToSubscribe ) ;
1145+ this . initializedDexs = new Set ( ) ;
1146+
9821147 // Set up individual subscriptions for each DEX
9831148 const subscriptionPromises : Promise < void > [ ] = [ ] ;
9841149
@@ -1165,6 +1330,9 @@ export class HyperLiquidSubscriptionService {
11651330 this . dexPositionsCache . set ( cacheKey , positionsWithTPSL ) ;
11661331 this . dexAccountCache . set ( cacheKey , accountState ) ;
11671332
1333+ // Mark this DEX as initialized (has sent first data)
1334+ this . initializedDexs . add ( cacheKey ) ;
1335+
11681336 // Trigger aggregation and notify subscribers
11691337 this . aggregateAndNotifySubscribers ( ) ;
11701338 }
@@ -1176,6 +1344,9 @@ export class HyperLiquidSubscriptionService {
11761344 `clearinghouseState subscription established for DEX: ${ dexName || 'main' } ` ,
11771345 ) ;
11781346 } catch ( error ) {
1347+ // Remove this DEX from expected set so it doesn't block notifications for other DEXs
1348+ this . expectedDexs . delete ( dexName ) ;
1349+
11791350 Logger . error (
11801351 ensureError ( error ) ,
11811352 this . getErrorContext ( 'ensureClearinghouseStateSubscription' , {
@@ -1239,6 +1410,9 @@ export class HyperLiquidSubscriptionService {
12391410 this . dexPositionsCache . set ( cacheKey , positionsWithTPSL ) ;
12401411 }
12411412
1413+ // Mark this DEX as initialized (has sent first data)
1414+ this . initializedDexs . add ( cacheKey ) ;
1415+
12421416 // Trigger aggregation and notify subscribers
12431417 this . aggregateAndNotifySubscribers ( ) ;
12441418 }
@@ -1250,6 +1424,9 @@ export class HyperLiquidSubscriptionService {
12501424 `openOrders subscription established for DEX: ${ dexName || 'main' } ` ,
12511425 ) ;
12521426 } catch ( error ) {
1427+ // Remove this DEX from expected set so it doesn't block notifications for other DEXs
1428+ this . expectedDexs . delete ( dexName ) ;
1429+
12531430 Logger . error (
12541431 ensureError ( error ) ,
12551432 this . getErrorContext ( 'ensureOpenOrdersSubscription' , {
@@ -1265,12 +1442,34 @@ export class HyperLiquidSubscriptionService {
12651442 * Used by both webData3 callback and fallback subscription callbacks
12661443 */
12671444 private aggregateAndNotifySubscribers ( ) : void {
1268- // Aggregate data from all DEX caches
1269- const aggregatedPositions = Array . from (
1270- this . dexPositionsCache . values ( ) ,
1271- ) . flat ( ) ;
1445+ // Wait for all expected DEXs to send initial data before notifying
1446+ // This ensures positions from all DEXs appear simultaneously
1447+ if ( this . expectedDexs . size > 0 ) {
1448+ const allDexsInitialized = Array . from ( this . expectedDexs ) . every ( ( dex ) =>
1449+ this . initializedDexs . has ( dex ) ,
1450+ ) ;
1451+ if ( ! allDexsInitialized ) {
1452+ DevLogger . log ( 'Waiting for all DEXs to send initial data' , {
1453+ expected : Array . from ( this . expectedDexs ) ,
1454+ initialized : Array . from ( this . initializedDexs ) ,
1455+ } ) ;
1456+ return ; // Don't notify yet - waiting for more DEXs
1457+ }
1458+ }
12721459
1273- const aggregatedOrders = Array . from ( this . dexOrdersCache . values ( ) ) . flat ( ) ;
1460+ // Aggregate data from all DEX caches
1461+ // Order: Main DEX (crypto perps) first, then HIP-3 DEXs
1462+ const mainDexPositions = this . dexPositionsCache . get ( '' ) || [ ] ;
1463+ const hip3DexPositions = Array . from ( this . dexPositionsCache . entries ( ) )
1464+ . filter ( ( [ key ] ) => key !== '' )
1465+ . flatMap ( ( [ , positions ] ) => positions ) ;
1466+ const aggregatedPositions = [ ...mainDexPositions , ...hip3DexPositions ] ;
1467+
1468+ const mainDexOrders = this . dexOrdersCache . get ( '' ) || [ ] ;
1469+ const hip3DexOrders = Array . from ( this . dexOrdersCache . entries ( ) )
1470+ . filter ( ( [ key ] ) => key !== '' )
1471+ . flatMap ( ( [ , orders ] ) => orders ) ;
1472+ const aggregatedOrders = [ ...mainDexOrders , ...hip3DexOrders ] ;
12741473
12751474 const aggregatedAccount = this . aggregateAccountStates ( ) ;
12761475
@@ -1389,6 +1588,10 @@ export class HyperLiquidSubscriptionService {
13891588 this . dexOrdersCache . clear ( ) ;
13901589 this . dexAccountCache . clear ( ) ;
13911590
1591+ // Clear DEX tracking for synchronized notifications
1592+ this . expectedDexs . clear ( ) ;
1593+ this . initializedDexs . clear ( ) ;
1594+
13921595 // Clear aggregated caches
13931596 this . cachedPositions = null ;
13941597 this . cachedOrders = null ;
0 commit comments