@@ -15,6 +15,9 @@ type AhEventEnum = AhSystemEvent['event'];
1515type AhMigratorEvents = Extract < AhEventEnum , { type : 'AhMigrator' } > [ 'value' ] ;
1616type BatchProcessedEvent = Extract < AhMigratorEvents , { type : 'BatchProcessed' } > [ 'value' ] ;
1717
18+ // Queue priority config types
19+ type QueuePriorityConfig = Awaited < ReturnType < RelayChainApi [ 'query' ] [ 'RcMigrator' ] [ 'AhUmpQueuePriorityConfig' ] [ 'getValue' ] > > ;
20+
1821import { eq , desc , and } from 'drizzle-orm' ;
1922
2023import { sql } from 'drizzle-orm' ;
@@ -27,6 +30,7 @@ import {
2730 dmpQueueEvents ,
2831 umpQueueEvents ,
2932 balanceVerification ,
33+ queuePriorityConfigs ,
3034} from '../db/schema' ;
3135import { Log } from '../logging/Log' ;
3236import { getCurrentStageForPallet , getPalletFromStage , normalizeEventPalletName } from '../util/stageToPalletMapping' ;
@@ -1022,22 +1026,56 @@ export class BlockProcessor {
10221026 }
10231027
10241028 private async handleUmpQueuePriority (
1025- queuePriority : any ,
1029+ queuePriority : QueuePriorityConfig ,
10261030 item : QueueItem
10271031 ) : Promise < void > {
10281032 try {
10291033 const priorityType = queuePriority . type ;
10301034
1031- eventService . emit ( 'umpQueuePriority' , {
1032- type : priorityType ,
1033- timestamp : new Date ( item . timestamp ! ) . toISOString ( ) ,
1035+ const existing = await db . query . queuePriorityConfigs . findFirst ( {
1036+ where : eq ( queuePriorityConfigs . queueType , 'ump' ) ,
10341037 } ) ;
10351038
1036- Log . service ( {
1037- service : 'UMP Queue Priority' ,
1038- action : 'UMP queue priority config retrieved' ,
1039- details : { type : priorityType } ,
1040- } ) ;
1039+ // Only update if this is the first time, or if the priority type has actually changed
1040+ if ( ! existing ) {
1041+ // First time - insert new record
1042+ await db . insert ( queuePriorityConfigs ) . values ( {
1043+ queueType : 'ump' ,
1044+ priorityType,
1045+ lastUpdated : new Date ( item . timestamp ! ) ,
1046+ } ) ;
1047+
1048+ eventService . emit ( 'umpQueuePriority' , {
1049+ type : priorityType ,
1050+ timestamp : new Date ( item . timestamp ! ) . toISOString ( ) ,
1051+ } ) ;
1052+
1053+ Log . service ( {
1054+ service : 'UMP Queue Priority' ,
1055+ action : 'UMP queue priority config initialized' ,
1056+ details : { type : priorityType } ,
1057+ } ) ;
1058+ } else if ( existing . priorityType !== priorityType ) {
1059+ // Priority type has changed - update existing record
1060+ await db . update ( queuePriorityConfigs )
1061+ . set ( {
1062+ priorityType,
1063+ lastUpdated : new Date ( item . timestamp ! ) ,
1064+ } )
1065+ . where ( eq ( queuePriorityConfigs . queueType , 'ump' ) ) ;
1066+
1067+ eventService . emit ( 'umpQueuePriority' , {
1068+ type : priorityType ,
1069+ timestamp : new Date ( item . timestamp ! ) . toISOString ( ) ,
1070+ } ) ;
1071+
1072+ Log . service ( {
1073+ service : 'UMP Queue Priority' ,
1074+ action : 'UMP queue priority config changed' ,
1075+ details : { type : priorityType } ,
1076+ } ) ;
1077+ }
1078+ // else: priority type hasn't changed, do nothing
10411079 } catch ( error ) {
10421080 Log . service ( {
10431081 service : 'UMP Queue Priority' ,
@@ -1048,22 +1086,56 @@ export class BlockProcessor {
10481086 }
10491087
10501088 private async handleDmpQueuePriority (
1051- queuePriority : any ,
1089+ queuePriority : QueuePriorityConfig ,
10521090 item : QueueItem
10531091 ) : Promise < void > {
10541092 try {
10551093 const priorityType = queuePriority . type ;
10561094
1057- eventService . emit ( 'dmpQueuePriority' , {
1058- type : priorityType ,
1059- timestamp : new Date ( item . timestamp ! ) . toISOString ( ) ,
1095+ const existing = await db . query . queuePriorityConfigs . findFirst ( {
1096+ where : eq ( queuePriorityConfigs . queueType , 'dmp' ) ,
10601097 } ) ;
10611098
1062- Log . service ( {
1063- service : 'DMP Queue Priority' ,
1064- action : 'DMP queue priority config retrieved' ,
1065- details : { type : priorityType } ,
1066- } ) ;
1099+ // Only update if this is the first time, or if the priority type has actually changed
1100+ if ( ! existing ) {
1101+ // First time - insert new record
1102+ await db . insert ( queuePriorityConfigs ) . values ( {
1103+ queueType : 'dmp' ,
1104+ priorityType,
1105+ lastUpdated : new Date ( item . timestamp ! ) ,
1106+ } ) ;
1107+
1108+ eventService . emit ( 'dmpQueuePriority' , {
1109+ type : priorityType ,
1110+ timestamp : new Date ( item . timestamp ! ) . toISOString ( ) ,
1111+ } ) ;
1112+
1113+ Log . service ( {
1114+ service : 'DMP Queue Priority' ,
1115+ action : 'DMP queue priority config initialized' ,
1116+ details : { type : priorityType } ,
1117+ } ) ;
1118+ } else if ( existing . priorityType !== priorityType ) {
1119+ // Priority type has changed - update existing record
1120+ await db . update ( queuePriorityConfigs )
1121+ . set ( {
1122+ priorityType,
1123+ lastUpdated : new Date ( item . timestamp ! ) ,
1124+ } )
1125+ . where ( eq ( queuePriorityConfigs . queueType , 'dmp' ) ) ;
1126+
1127+ eventService . emit ( 'dmpQueuePriority' , {
1128+ type : priorityType ,
1129+ timestamp : new Date ( item . timestamp ! ) . toISOString ( ) ,
1130+ } ) ;
1131+
1132+ Log . service ( {
1133+ service : 'DMP Queue Priority' ,
1134+ action : 'DMP queue priority config changed' ,
1135+ details : { type : priorityType } ,
1136+ } ) ;
1137+ }
1138+ // else: priority type hasn't changed, do nothing
10671139 } catch ( error ) {
10681140 Log . service ( {
10691141 service : 'DMP Queue Priority' ,
0 commit comments