1- import type { PalletRcMigratorMigrationStage } from '../types/pjs' ;
1+ import type { PalletRcMigratorMigrationStage , PalletRcMigratorQueuePriority } from '../types/pjs' ;
22import type { ApiDecoration } from '@polkadot/api/types' ;
33import type { Vec , Bytes } from '@polkadot/types' ;
44import type { Event } from '@polkadot/types/interfaces' ;
@@ -51,6 +51,8 @@ export class BlockProcessor {
5151 private previousDmpQueueSize : number = 0 ;
5252 private currentMode : ProcessingMode = ProcessingMode . DETECTION ;
5353 private migrationStartBlockNumber ?: number ;
54+ private rcPriorityConfigQueried : boolean = false ;
55+ private ahPriorityConfigQueried : boolean = false ;
5456
5557 private constructor ( ) {
5658 // Initialize queues for both chains
@@ -62,6 +64,8 @@ export class BlockProcessor {
6264 this . lastBlockNumber . set ( 'relay-chain' , 0 ) ;
6365 this . lastBlockNumber . set ( 'asset-hub' , 0 ) ;
6466 this . currentMode = ProcessingMode . DETECTION ;
67+ this . rcPriorityConfigQueried = false ;
68+ this . ahPriorityConfigQueried = false ;
6569 }
6670
6771 /**
@@ -429,6 +433,11 @@ export class BlockProcessor {
429433 if ( event . section === 'parachainSystem' && event . method === 'UpwardMessageSent' ) {
430434 await this . handleAssetHubUpwardMessageSent ( event , item ) ;
431435 }
436+
437+ // Handle ahMigrator.DmpQueuePrioritySet event
438+ if ( event . section === 'ahMigrator' && event . method === 'DmpQueuePriorityConfigSet' ) {
439+ await this . handleDmpQueuePriorityChangedEvent ( event , apiAt , item ) ;
440+ }
432441 }
433442
434443 // Emit pallet migration events after processing all events in this block
@@ -473,6 +482,11 @@ export class BlockProcessor {
473482 if ( event . section === 'messageQueue' && event . method === 'Processed' ) {
474483 await this . handleRelayChainMessageQueueProcessed ( event , item ) ;
475484 }
485+
486+ // Handle rcMigrator.AhUmpQueuePriorityConfigSet event
487+ if ( event . section === 'rcMigrator' && event . method === 'AhUmpQueuePriorityConfigSet' ) {
488+ await this . handleUmpQueuePriorityChangedEvent ( event , apiAt , item ) ;
489+ }
476490 }
477491 } catch ( error ) {
478492 Log . service ( {
@@ -640,7 +654,7 @@ export class BlockProcessor {
640654 * Relay Chain event handlers
641655 */
642656 private async handleRelayChainMessageQueueProcessed (
643- event : Event ,
657+ _ : Event ,
644658 item : QueueItem
645659 ) : Promise < void > {
646660 try {
@@ -686,11 +700,23 @@ export class BlockProcessor {
686700 item : QueueItem
687701 ) : Promise < void > {
688702 try {
689- const [ pendingUpwardMessages ] = await Promise . all ( [
690- apiAt . query . parachainSystem . pendingUpwardMessages < Vec < Bytes > > ( ) ,
691- ] ) ;
703+ // Query priority config only once on first full mode block
704+ if ( ! this . ahPriorityConfigQueried ) {
705+ const [ pendingUpwardMessages , dmpQueuePriority ] = await Promise . all ( [
706+ apiAt . query . parachainSystem . pendingUpwardMessages < Vec < Bytes > > ( ) ,
707+ apiAt . query . ahMigrator . dmpQueuePriorityConfig < PalletRcMigratorQueuePriority > ( ) ,
708+ ] ) ;
709+
710+ await this . handleAhPendingUpwardMessages ( pendingUpwardMessages , item ) ;
711+ await this . handleDmpQueuePriority ( dmpQueuePriority , item ) ;
712+ this . ahPriorityConfigQueried = true ;
713+ } else {
714+ const [ pendingUpwardMessages ] = await Promise . all ( [
715+ apiAt . query . parachainSystem . pendingUpwardMessages < Vec < Bytes > > ( ) ,
716+ ] ) ;
692717
693- await this . handleAhPendingUpwardMessages ( pendingUpwardMessages , item ) ;
718+ await this . handleAhPendingUpwardMessages ( pendingUpwardMessages , item ) ;
719+ }
694720 // TODO: Query Asset Hub specific storage:
695721 // - ahMigrator.ahMigrationStage (Do we actually need this?)
696722
@@ -717,16 +743,32 @@ export class BlockProcessor {
717743 item : QueueItem
718744 ) : Promise < void > {
719745 try {
720- const [ migrationStage , dmpMessageQueue ] = await Promise . all ( [
721- apiAt . query . rcMigrator . rcMigrationStage < PalletRcMigratorMigrationStage > ( ) ,
722- apiAt . query . dmp . downwardMessageQueues < Vec < PolkadotCorePrimitivesInboundDownwardMessage > > (
723- 1000
724- ) ,
725- ] ) ;
726-
727- // TODO: Is there specific ordering to this or can we put it in a Promise.all?
728- await this . handleRcMigrationStage ( migrationStage , item ) ;
729- await this . handleRcDownwardMessageQueues ( dmpMessageQueue , item ) ;
746+ // Query priority config only once on first full mode block
747+ if ( ! this . rcPriorityConfigQueried ) {
748+ const [ migrationStage , dmpMessageQueue , umpQueuePriority ] = await Promise . all ( [
749+ apiAt . query . rcMigrator . rcMigrationStage < PalletRcMigratorMigrationStage > ( ) ,
750+ apiAt . query . dmp . downwardMessageQueues < Vec < PolkadotCorePrimitivesInboundDownwardMessage > > (
751+ 1000
752+ ) ,
753+ apiAt . query . rcMigrator . ahUmpQueuePriorityConfig < PalletRcMigratorQueuePriority > ( ) ,
754+ ] ) ;
755+
756+ await this . handleRcMigrationStage ( migrationStage , item ) ;
757+ await this . handleRcDownwardMessageQueues ( dmpMessageQueue , item ) ;
758+ await this . handleUmpQueuePriority ( umpQueuePriority , item ) ;
759+
760+ this . rcPriorityConfigQueried = true ;
761+ } else {
762+ const [ migrationStage , dmpMessageQueue ] = await Promise . all ( [
763+ apiAt . query . rcMigrator . rcMigrationStage < PalletRcMigratorMigrationStage > ( ) ,
764+ apiAt . query . dmp . downwardMessageQueues < Vec < PolkadotCorePrimitivesInboundDownwardMessage > > (
765+ 1000
766+ ) ,
767+ ] ) ;
768+
769+ await this . handleRcMigrationStage ( migrationStage , item ) ;
770+ await this . handleRcDownwardMessageQueues ( dmpMessageQueue , item ) ;
771+ }
730772
731773 Log . service ( {
732774 service : 'Block Processor' ,
@@ -927,6 +969,104 @@ export class BlockProcessor {
927969 }
928970 }
929971
972+ private async handleUmpQueuePriority (
973+ queuePriority : PalletRcMigratorQueuePriority ,
974+ item : QueueItem
975+ ) : Promise < void > {
976+ try {
977+ const priorityType = queuePriority . type ;
978+
979+ eventService . emit ( 'umpQueuePriority' , {
980+ type : priorityType ,
981+ timestamp : new Date ( item . timestamp ! ) . toISOString ( ) ,
982+ } ) ;
983+
984+ Log . service ( {
985+ service : 'UMP Queue Priority' ,
986+ action : 'UMP queue priority config retrieved' ,
987+ details : { type : priorityType } ,
988+ } ) ;
989+ } catch ( error ) {
990+ Log . service ( {
991+ service : 'UMP Queue Priority' ,
992+ action : 'Error processing UMP queue priority' ,
993+ error : error as Error ,
994+ } ) ;
995+ }
996+ }
997+
998+ private async handleDmpQueuePriority (
999+ queuePriority : any ,
1000+ item : QueueItem
1001+ ) : Promise < void > {
1002+ try {
1003+ const priorityType = queuePriority . type ;
1004+
1005+ eventService . emit ( 'dmpQueuePriority' , {
1006+ type : priorityType ,
1007+ timestamp : new Date ( item . timestamp ! ) . toISOString ( ) ,
1008+ } ) ;
1009+
1010+ Log . service ( {
1011+ service : 'DMP Queue Priority' ,
1012+ action : 'DMP queue priority config retrieved' ,
1013+ details : { type : priorityType } ,
1014+ } ) ;
1015+ } catch ( error ) {
1016+ Log . service ( {
1017+ service : 'DMP Queue Priority' ,
1018+ action : 'Error processing DMP queue priority' ,
1019+ error : error as Error ,
1020+ } ) ;
1021+ }
1022+ }
1023+
1024+ private async handleUmpQueuePriorityChangedEvent (
1025+ _ : Event ,
1026+ apiAt : ApiDecoration < 'promise' > ,
1027+ item : QueueItem
1028+ ) : Promise < void > {
1029+ try {
1030+ Log . service ( {
1031+ service : 'UMP Queue Priority' ,
1032+ action : 'AhUmpQueuePrioritySet event detected - re-querying config' ,
1033+ details : { blockNumber : item . blockNumber } ,
1034+ } ) ;
1035+
1036+ const umpQueuePriority = await apiAt . query . rcMigrator . ahUmpQueuePriorityConfig < PalletRcMigratorQueuePriority > ( ) ;
1037+ await this . handleUmpQueuePriority ( umpQueuePriority , item ) ;
1038+ } catch ( error ) {
1039+ Log . service ( {
1040+ service : 'UMP Queue Priority' ,
1041+ action : 'Error handling UMP priority change event' ,
1042+ error : error as Error ,
1043+ } ) ;
1044+ }
1045+ }
1046+
1047+ private async handleDmpQueuePriorityChangedEvent (
1048+ _ : Event ,
1049+ apiAt : ApiDecoration < 'promise' > ,
1050+ item : QueueItem
1051+ ) : Promise < void > {
1052+ try {
1053+ Log . service ( {
1054+ service : 'DMP Queue Priority' ,
1055+ action : 'DmpQueuePrioritySet event detected - re-querying config' ,
1056+ details : { blockNumber : item . blockNumber } ,
1057+ } ) ;
1058+
1059+ const dmpQueuePriority = await apiAt . query . ahMigrator . dmpQueuePriorityConfig ( ) ;
1060+ await this . handleDmpQueuePriority ( dmpQueuePriority , item ) ;
1061+ } catch ( error ) {
1062+ Log . service ( {
1063+ service : 'DMP Queue Priority' ,
1064+ action : 'Error handling DMP priority change event' ,
1065+ error : error as Error ,
1066+ } ) ;
1067+ }
1068+ }
1069+
9301070 /**
9311071 * Lightweight detection mode - only looks for migration scheduling events on Relay Chain
9321072 */
0 commit comments