Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 33 additions & 3 deletions backend/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { Log } from './logging/Log';
import { updatesHandler } from './routes/updates';
import { BlockProcessor } from './services/BlockProcessor';
import { FinalizedService } from './services/finalizedService';
import { MockFinalizedService } from './services/MockFinalizedService';

const app = express();
const port = getConfig().port;
Expand All @@ -35,10 +36,17 @@ app.use(cors());
// Health check endpoint
app.get('/health', (_req: Request, res: Response) => {
const blockProcessor = BlockProcessor.getInstance();
const finalizedService = FinalizedService.getInstance();
const testMode = process.env.TEST_MODE;
const useMock = testMode !== undefined && testMode !== '';

const finalizedService = useMock
? MockFinalizedService.getInstance()
: FinalizedService.getInstance();

res.json({
status: 'ok',
mode: useMock ? 'mock' : 'real',
testMode: testMode || null,
migrationStatus: blockProcessor.getMigrationStatus(),
subscriptions: finalizedService.getStatus()
});
Expand All @@ -47,9 +55,15 @@ app.get('/health', (_req: Request, res: Response) => {
// Queue status endpoint to monitor BlockProcessor queue growth
app.get('/api/queue-status', (_req: Request, res: Response) => {
const blockProcessor = BlockProcessor.getInstance();
const finalizedService = FinalizedService.getInstance();
const testMode = process.env.TEST_MODE;
const useMock = testMode !== undefined && testMode !== '';

const finalizedService = useMock
? MockFinalizedService.getInstance()
: FinalizedService.getInstance();

res.json({
mode: useMock ? 'mock' : 'real',
queues: blockProcessor.getQueueStatus(),
subscriptions: finalizedService.getStatus(),
timestamp: new Date().toISOString(),
Expand All @@ -61,7 +75,23 @@ app.get('/api/updates', updatesHandler);

const main = async () => {
const blockProcessor = BlockProcessor.getInstance();
const finalizedService = FinalizedService.getInstance();

// Choose service based on TEST_MODE environment variable
const testMode = process.env.TEST_MODE;
const useMock = testMode !== undefined && testMode !== '';

const finalizedService = useMock
? MockFinalizedService.getInstance()
: FinalizedService.getInstance();

Log.service({
service: 'Application',
action: 'Finalized service mode',
details: {
mode: useMock ? 'MOCK' : 'REAL',
testMode: testMode || 'none'
},
});

// Initialize BlockProcessor (includes DB state check)
await blockProcessor.initialize();
Expand Down
13 changes: 4 additions & 9 deletions backend/src/services/BlockProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ export class BlockProcessor {
// Initialize last block numbers to 0
this.lastBlockNumber.set('relay-chain', 0);
this.lastBlockNumber.set('asset-hub', 0);
// CRISIS MODE: Force full processing mode immediately
this.currentMode = ProcessingMode.FULL;
this.currentMode = ProcessingMode.DETECTION;
}

/**
Expand Down Expand Up @@ -1000,17 +999,16 @@ export class BlockProcessor {

// Look for rcMigrator.StageTransition events
if (event.section === 'rcMigrator' && event.method === 'StageTransition') {

const [fromState, toState] = event.data.toJSON() as [unknown, unknown];

// Type guard for scheduled state
const isScheduledState = (state: unknown): state is { scheduled: { blockNumber: string } } => {
return !!state && typeof state === 'object' && !!(state as { scheduled?: { blockNumber?: unknown } }).scheduled?.blockNumber
const isScheduledState = (state: unknown): state is { scheduled: { start: string } } => {
return !!state && typeof state === 'object' && !!(state as { scheduled?: { start?: unknown } }).scheduled?.start
};

// Check if transitioning TO scheduled state
if (isScheduledState(toState)) {
const scheduledBlock = parseInt(toState.scheduled.blockNumber);
const scheduledBlock = parseInt(toState.scheduled.start);

Log.service({
service: 'Block Processor',
Expand All @@ -1027,9 +1025,6 @@ export class BlockProcessor {
// Set migration block number internally and persist to DB
await this.setMigrationBlockNumber(scheduledBlock);

// Switch to full processing mode
this.switchToFullMode();

return; // Found what we're looking for!
}

Expand Down
252 changes: 252 additions & 0 deletions backend/src/services/MockFinalizedService.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
import { Log } from '../logging/Log';
import { AbstractApi } from './abstractApi';
import { BlockProcessor } from './BlockProcessor';
import { eventService } from './eventService';

// Start blocks for different test networks
const TEST_START_BLOCKS: Record<string, { relayChain: number; assetHub: number }> = {
paseo: {
relayChain: 7926842 - 5,
assetHub: 2593897 - 5,
},
};

export class MockFinalizedService {
private static instance: MockFinalizedService;
private blockProcessor: BlockProcessor;
private rcIntervalId: NodeJS.Timeout | null = null;
private ahIntervalId: NodeJS.Timeout | null = null;
private rcCurrentBlock: number = 0;
private ahCurrentBlock: number = 0;
private readonly rcBlockInterval: number = 6000; // 6 seconds per block (Relay Chain)
private readonly ahBlockInterval: number = 12000; // 12 seconds per block (Asset Hub)

private constructor() {
this.blockProcessor = BlockProcessor.getInstance();
}

public static getInstance(): MockFinalizedService {
if (!MockFinalizedService.instance) {
MockFinalizedService.instance = new MockFinalizedService();
}
return MockFinalizedService.instance;
}

/**
* Start mock finalized head production
*/
public async start(): Promise<void> {
const testMode = process.env.TEST_MODE;

if (!testMode) {
throw new Error('TEST_MODE environment variable not set');
}

const startBlocks = TEST_START_BLOCKS[testMode];
if (!startBlocks) {
throw new Error(`Unknown TEST_MODE: ${testMode}. Available modes: ${Object.keys(TEST_START_BLOCKS).join(', ')}`);
}

this.rcCurrentBlock = startBlocks.relayChain;
this.ahCurrentBlock = startBlocks.assetHub;

Log.service({
service: 'Mock Finalized Service',
action: 'Starting mock finalized head production',
details: {
testMode,
rcStartBlock: this.rcCurrentBlock,
ahStartBlock: this.ahCurrentBlock,
rcBlockInterval: `${this.rcBlockInterval}ms`,
ahBlockInterval: `${this.ahBlockInterval}ms`,
},
});

try {
await Promise.all([this.startRelayChainProduction(), this.startAssetHubProduction()]);

Log.service({
service: 'Mock Finalized Service',
action: 'Mock finalized head production started successfully',
});
} catch (error) {
Log.service({
service: 'Mock Finalized Service',
action: 'Failed to start mock finalized head production',
error: error as Error,
});
throw error;
}
}

/**
* Start Relay Chain mock block production
*/
private async startRelayChainProduction(): Promise<void> {
const abstractApi = AbstractApi.getInstance();
const api = await abstractApi.getRelayChainApi();

this.rcIntervalId = setInterval(async () => {
const blockNumber = this.rcCurrentBlock;

try {
// Query actual block hash from the node
const blockHash = await api.rpc.chain.getBlockHash(blockNumber);
const blockHashHex = blockHash.toHex();

Log.chainEvent({
chain: 'relay-chain',
eventType: 'finalized_head (mock)',
blockNumber,
details: { blockHash: blockHashHex },
});

// Emit to frontend
eventService.emit('rcHead', {
blockNumber,
blockHash: blockHashHex,
timestamp: new Date().toISOString(),
});

// Submit to BlockProcessor
this.blockProcessor.addBlock('relay-chain', blockNumber, blockHashHex);

this.rcCurrentBlock++;
} catch (error) {
Log.chainEvent({
chain: 'relay-chain',
eventType: 'mock block hash query error',
blockNumber,
error: error as Error,
});
}
}, this.rcBlockInterval);

Log.service({
service: 'Mock Finalized Service',
action: 'Relay Chain mock block production started',
details: { startBlock: this.rcCurrentBlock },
});
}

/**
* Start Asset Hub mock block production
*/
private async startAssetHubProduction(): Promise<void> {
const abstractApi = AbstractApi.getInstance();
const api = await abstractApi.getAssetHubApi();

this.ahIntervalId = setInterval(async () => {
const blockNumber = this.ahCurrentBlock;

try {
// Query actual block hash from the node
const blockHash = await api.rpc.chain.getBlockHash(blockNumber);
const blockHashHex = blockHash.toHex();

Log.chainEvent({
chain: 'asset-hub',
eventType: 'finalized_head (mock)',
blockNumber,
details: { blockHash: blockHashHex },
});

// Emit to frontend
eventService.emit('ahHead', {
blockNumber,
blockHash: blockHashHex,
timestamp: new Date().toISOString(),
});

// Submit to BlockProcessor
this.blockProcessor.addBlock('asset-hub', blockNumber, blockHashHex);

this.ahCurrentBlock++;
} catch (error) {
Log.chainEvent({
chain: 'asset-hub',
eventType: 'mock block hash query error',
blockNumber,
error: error as Error,
});
}
}, this.ahBlockInterval);

Log.service({
service: 'Mock Finalized Service',
action: 'Asset Hub mock block production started',
details: { startBlock: this.ahCurrentBlock },
});
}

/**
* Stop all mock block production and cleanup
*/
public async stop(): Promise<void> {
Log.service({
service: 'Mock Finalized Service',
action: 'Stopping mock finalized head production',
});

try {
if (this.rcIntervalId) {
clearInterval(this.rcIntervalId);
this.rcIntervalId = null;
}

if (this.ahIntervalId) {
clearInterval(this.ahIntervalId);
this.ahIntervalId = null;
}

Log.service({
service: 'Mock Finalized Service',
action: 'Mock finalized head production stopped',
details: {
finalRcBlock: this.rcCurrentBlock - 1,
finalAhBlock: this.ahCurrentBlock - 1,
},
});
} catch (error) {
Log.service({
service: 'Mock Finalized Service',
action: 'Error stopping mock finalized head production',
error: error as Error,
});
throw error;
}
}

/**
* Get status of mock production
*/
public getStatus(): { rcActive: boolean; ahActive: boolean; rcCurrentBlock: number; ahCurrentBlock: number } {
return {
rcActive: this.rcIntervalId !== null,
ahActive: this.ahIntervalId !== null,
rcCurrentBlock: this.rcCurrentBlock,
ahCurrentBlock: this.ahCurrentBlock,
};
}

/**
* Manually advance to a specific block (for testing specific scenarios)
*/
public jumpToBlock(chain: 'relay-chain' | 'asset-hub', blockNumber: number): void {
if (chain === 'relay-chain') {
this.rcCurrentBlock = blockNumber;
Log.service({
service: 'Mock Finalized Service',
action: 'Jumped Relay Chain to block',
details: { blockNumber },
});
} else {
this.ahCurrentBlock = blockNumber;
Log.service({
service: 'Mock Finalized Service',
action: 'Jumped Asset Hub to block',
details: { blockNumber },
});
}
}
}