diff --git a/backend/src/db/initializeDb.ts b/backend/src/db/initializeDb.ts index 9277d3d..8fb765a 100644 --- a/backend/src/db/initializeDb.ts +++ b/backend/src/db/initializeDb.ts @@ -1,7 +1,7 @@ import { Log } from '../logging/Log'; import { TimeInStageCache } from '../services/cache/TimeInStageCache'; -import { xcmMessageCounters, dmpMetricsCache } from './schema'; +import { xcmMessageCounters } from './schema'; import { db } from './index'; @@ -25,7 +25,6 @@ export async function initializeDb() { await db.insert(xcmMessageCounters).values({ sourceChain: 'relay-chain', destinationChain: 'asset-hub', - messagesSent: 0, messagesProcessed: 0, messagesFailed: 0, lastUpdated: new Date(), @@ -42,30 +41,12 @@ export async function initializeDb() { await db.insert(xcmMessageCounters).values({ sourceChain: 'asset-hub', destinationChain: 'relay-chain', - messagesSent: 0, messagesProcessed: 0, messagesFailed: 0, lastUpdated: new Date(), }); } - // Initialize DMP metrics cache if it doesn't exist - const existingDmpCache = await db.query.dmpMetricsCache.findFirst(); - if (!existingDmpCache) { - Log.service({ - service: 'Database Initialization', - action: 'Initializing DMP metrics cache', - }); - await db.insert(dmpMetricsCache).values({ - currentQueueSize: 0, - currentQueueSizeBytes: 0, - averageLatencyMs: 0, - averageThroughput: 0, - averageThroughputBytes: 0, - lastUpdated: new Date(), - }); - } - // Initialize TimeInStageCache Log.service({ service: 'Database Initialization', diff --git a/backend/src/db/schema.ts b/backend/src/db/schema.ts index d99e4a4..25c9675 100644 --- a/backend/src/db/schema.ts +++ b/backend/src/db/schema.ts @@ -31,7 +31,6 @@ export const xcmMessageCounters = sqliteTable('xcm_message_counters', { id: integer('id').primaryKey({ autoIncrement: true }), sourceChain: text('source_chain').notNull(), destinationChain: text('destination_chain').notNull(), - messagesSent: integer('messages_sent').notNull().default(0), messagesProcessed: integer('messages_processed').notNull().default(0), messagesFailed: integer('messages_failed').notNull().default(0), lastUpdated: integer('last_updated', { mode: 'timestamp' }) @@ -84,66 +83,8 @@ export const umpQueueEvents = sqliteTable('ump_queue_events', { timestamp: integer('timestamp', { mode: 'timestamp' }).default(sql`CURRENT_TIMESTAMP`), }); -// Message Processing Events (from Asset Hub) -export const messageProcessingEventsRC = sqliteTable('message_processing_events_rc', { - id: integer('id').primaryKey({ autoIncrement: true }), - timestamp: integer('timestamp', { mode: 'timestamp' }).default(sql`CURRENT_TIMESTAMP`), -}); - -// Message Processing Events (from Asset Hub) -export const messageProcessingEventsAH = sqliteTable('message_processing_events', { - id: integer('id').primaryKey({ autoIncrement: true }), - timestamp: integer('timestamp', { mode: 'timestamp' }).default(sql`CURRENT_TIMESTAMP`), -}); - -// Upward Message Sent Events (from Asset Hub) -export const upwardMessageSentEvents = sqliteTable('upward_message_sent_events', { - id: integer('id').primaryKey({ autoIncrement: true }), - timestamp: integer('timestamp', { mode: 'timestamp' }).default(sql`CURRENT_TIMESTAMP`), -}); - -// Queue-Processing Correlation -export const queueProcessingCorrelation = sqliteTable('queue_processing_correlation', { - id: integer('id').primaryKey({ autoIncrement: true }), - queueEventId: integer('queue_event_id') - .notNull() - .references(() => dmpQueueEvents.id), - processingEventId: integer('processing_event_id') - .notNull() - .references(() => messageProcessingEventsAH.id), - latencyMs: integer('latency_ms').notNull(), // Time between queue drain and processing - throughput: integer('throughput').notNull(), // Messages per second - throughputBytes: integer('throughput_bytes').notNull(), // Bytes per second - timestamp: integer('timestamp', { mode: 'timestamp' }).default(sql`CURRENT_TIMESTAMP`), -}); - -// Cached DMP Metrics for quick frontend queries -export const dmpMetricsCache = sqliteTable('dmp_metrics_cache', { - id: integer('id').primaryKey({ autoIncrement: true }), - currentQueueSize: integer('current_queue_size').notNull().default(0), - currentQueueSizeBytes: integer('current_queue_size_bytes').notNull().default(0), - averageLatencyMs: integer('average_latency_ms').notNull().default(0), - averageThroughput: integer('average_throughput').notNull().default(0), - averageThroughputBytes: integer('average_throughput_bytes').notNull().default(0), - lastUpdated: integer('last_updated', { mode: 'timestamp' }) - .notNull() - .$defaultFn(() => new Date()), -}); - export type DmpQueueEvent = typeof dmpQueueEvents.$inferSelect; export type NewDmpQueueEvent = typeof dmpQueueEvents.$inferInsert; export type UmpQueueEvent = typeof umpQueueEvents.$inferSelect; export type NewUmpQueueEvent = typeof umpQueueEvents.$inferInsert; - -export type MessageProcessingEventAH = typeof messageProcessingEventsAH.$inferSelect; -export type NewMessageProcessingEvent = typeof messageProcessingEventsAH.$inferInsert; - -export type QueueProcessingCorrelation = typeof queueProcessingCorrelation.$inferSelect; -export type NewQueueProcessingCorrelation = typeof queueProcessingCorrelation.$inferInsert; - -export type DmpMetricsCache = typeof dmpMetricsCache.$inferSelect; -export type NewDmpMetricsCache = typeof dmpMetricsCache.$inferInsert; - -export type UpwardMessageSentEvent = typeof upwardMessageSentEvents.$inferSelect; -export type NewUpwardMessageSentEvent = typeof upwardMessageSentEvents.$inferInsert; diff --git a/backend/src/routes/updates.ts b/backend/src/routes/updates.ts index ad44b21..d1b9ca8 100644 --- a/backend/src/routes/updates.ts +++ b/backend/src/routes/updates.ts @@ -4,10 +4,8 @@ import { Request, Response, RequestHandler } from 'express'; import { db } from '../db'; import { migrationStages, xcmMessageCounters } from '../db/schema'; import { Log } from '../logging/Log'; -import { DmpMetricsCache } from '../services/cache/DmpMetricsCache'; import { PalletMigrationCache } from '../services/cache/PalletMigrationCache'; import { TimeInStageCache } from '../services/cache/TimeInStageCache'; -import { UmpMetricsCache } from '../services/cache/UmpMetricsCache'; import { eventService } from '../services/eventService'; import { getPalletFromStage } from '../util/stageToPalletMapping'; @@ -77,7 +75,6 @@ export const updatesHandler: RequestHandler = async (req: Request, res: Response sendEvent('rcXcmMessageCounter', { sourceChain: rcCounter.sourceChain, destinationChain: rcCounter.destinationChain, - messagesSent: rcCounter.messagesSent, messagesProcessed: rcCounter.messagesProcessed, messagesFailed: rcCounter.messagesFailed, lastUpdated: rcCounter.lastUpdated, @@ -93,7 +90,6 @@ export const updatesHandler: RequestHandler = async (req: Request, res: Response sendEvent('ahXcmMessageCounter', { sourceChain: ahCounter.sourceChain, destinationChain: ahCounter.destinationChain, - messagesSent: ahCounter.messagesSent, messagesProcessed: ahCounter.messagesProcessed, messagesFailed: ahCounter.messagesFailed, lastUpdated: ahCounter.lastUpdated, @@ -145,34 +141,6 @@ export const updatesHandler: RequestHandler = async (req: Request, res: Response } } - // Handle DMP metrics initial state - if (requestedEvents.includes('dmpMetrics')) { - const dmpMetricsCacheInstance = DmpMetricsCache.getInstance(); - const currentMetrics = dmpMetricsCacheInstance.getMetrics(); - sendEvent('dmpMetrics', { - averageLatencyMs: currentMetrics.averageLatencyMs, - totalSizeBytes: currentMetrics.totalSizeBytes, - lastUpdated: currentMetrics.lastUpdated, - latencyCount: currentMetrics.latencyCount, - sizeCount: currentMetrics.sizeCount, - timestamp: new Date().toISOString(), - }); - } - - // Handle UMP metrics initial state - if (requestedEvents.includes('umpMetrics')) { - const umpMetricsCacheInstance = UmpMetricsCache.getInstance(); - const currentMetrics = umpMetricsCacheInstance.getMetrics(); - sendEvent('umpMetrics', { - averageLatencyMs: currentMetrics.averageLatencyMs, - totalSizeBytes: currentMetrics.totalSizeBytes, - lastUpdated: currentMetrics.lastUpdated, - latencyCount: currentMetrics.latencyCount, - sizeCount: currentMetrics.sizeCount, - timestamp: new Date().toISOString(), - }); - } - // Handle pallet migration summary initial state if (requestedEvents.includes('palletMigrationSummary')) { const palletMigrationCacheInstance = PalletMigrationCache.getInstance(); diff --git a/backend/src/services/BlockProcessor.ts b/backend/src/services/BlockProcessor.ts index feee625..7d2fd57 100644 --- a/backend/src/services/BlockProcessor.ts +++ b/backend/src/services/BlockProcessor.ts @@ -6,17 +6,15 @@ import type { FrameSystemEventRecord, PolkadotCorePrimitivesInboundDownwardMessage, } from '@polkadot/types/lookup'; -import type { ITuple } from '@polkadot/types/types'; import { eq, desc, and } from 'drizzle-orm'; +import { sql } from 'drizzle-orm'; + import { db } from '../db'; import { - messageProcessingEventsRC, migrationStages, palletMigrationCounters, - messageProcessingEventsAH, - upwardMessageSentEvents, xcmMessageCounters, dmpQueueEvents, umpQueueEvents, @@ -25,10 +23,8 @@ import { Log } from '../logging/Log'; import { getCurrentStageForPallet, getPalletFromStage } from '../util/stageToPalletMapping'; import { AbstractApi } from './abstractApi'; -import { DmpLatencyProcessor } from './cache/DmpLatencyProcessor'; import { PalletMigrationCache } from './cache/PalletMigrationCache'; import { TimeInStageCache } from './cache/TimeInStageCache'; -import { UmpLatencyProcessor } from './cache/UmpLatencyProcessor'; import { eventService } from './eventService'; // TODO: Ensure we handle migration stages correctly. @@ -647,22 +643,39 @@ export class BlockProcessor { private async handleAssetHubMessageQueueProcessed(event: Event, item: QueueItem): Promise { try { - const dmpLatencyProcessor = DmpLatencyProcessor.getInstance(); - dmpLatencyProcessor.addMessageQueueProcessed(new Date(item.timestamp!)); + // Simple increment - DMP messages processed on Asset Hub + await db.update(xcmMessageCounters) + .set({ + messagesProcessed: sql`messagesProcessed + 1`, + lastUpdated: new Date(item.timestamp!) + }) + .where(eq(xcmMessageCounters.destinationChain, 'asset-hub')); - await db.insert(messageProcessingEventsAH).values({ - timestamp: new Date(), + // Get updated counter and emit + const counter = await db.query.xcmMessageCounters.findFirst({ + where: (counters, { eq }) => eq(counters.destinationChain, 'asset-hub'), }); + + if (counter) { + eventService.emit('dmpMessageCounter', { + sourceChain: counter.sourceChain, + destinationChain: counter.destinationChain, + messagesProcessed: counter.messagesProcessed, + messagesFailed: counter.messagesFailed, + lastUpdated: counter.lastUpdated, + }); + } + Log.chainEvent({ chain: 'asset-hub', eventType: 'MessageQueue.Processed', blockNumber: item.blockNumber, - details: { eventData: event.toJSON() }, + details: { totalProcessed: counter?.messagesProcessed }, }); } catch (error) { Log.chainEvent({ chain: 'asset-hub', - eventType: 'MessageQueue.Processed database error', + eventType: 'MessageQueue.Processed error', error: error as Error, }); } @@ -670,27 +683,15 @@ export class BlockProcessor { private async handleAssetHubUpwardMessageSent(event: Event, item: QueueItem): Promise { try { - // Add to latency processor - const umpLatencyProcessor = UmpLatencyProcessor.getInstance(); - umpLatencyProcessor.addUpwardMessageSent(new Date(item.timestamp!)); - - await db.insert(upwardMessageSentEvents).values({ - timestamp: new Date(item.timestamp!), - }); - - eventService.emit('upwardMessageSent', { - timestamp: new Date(item.timestamp!).toISOString(), - }); Log.chainEvent({ chain: 'asset-hub', eventType: 'UpwardMessageSent', blockNumber: item.blockNumber, - details: { eventData: event.toJSON() }, }); } catch (error) { Log.chainEvent({ chain: 'asset-hub', - eventType: 'UpwardMessageSent database error', + eventType: 'UpwardMessageSent error', error: error as Error, }); } @@ -704,24 +705,39 @@ export class BlockProcessor { item: QueueItem ): Promise { try { - const umpLatencyProcessor = UmpLatencyProcessor.getInstance(); + // Simple increment - UMP messages processed on Relay Chain + await db.update(xcmMessageCounters) + .set({ + messagesProcessed: sql`messagesProcessed + 1`, + lastUpdated: new Date(item.timestamp!) + }) + .where(eq(xcmMessageCounters.destinationChain, 'relay-chain')); - await db.insert(messageProcessingEventsRC).values({ - timestamp: new Date(item.timestamp!), + // Get updated counter and emit + const counter = await db.query.xcmMessageCounters.findFirst({ + where: (counters, { eq }) => eq(counters.destinationChain, 'relay-chain'), }); - umpLatencyProcessor.addMessageQueueProcessed(new Date(item.timestamp!)); + if (counter) { + eventService.emit('umpMessageCounter', { + sourceChain: counter.sourceChain, + destinationChain: counter.destinationChain, + messagesProcessed: counter.messagesProcessed, + messagesFailed: counter.messagesFailed, + lastUpdated: counter.lastUpdated, + }); + } Log.chainEvent({ chain: 'relay-chain', eventType: 'MessageQueue.Processed', blockNumber: item.blockNumber, - details: { eventData: event.toJSON() }, + details: { totalProcessed: counter?.messagesProcessed }, }); } catch (error) { Log.chainEvent({ chain: 'relay-chain', - eventType: 'MessageQueue.Processed database error', + eventType: 'MessageQueue.Processed error', error: error as Error, }); } @@ -735,12 +751,10 @@ export class BlockProcessor { item: QueueItem ): Promise { try { - const [dmpDataMessageCounts, pendingUpwardMessages] = await Promise.all([ - apiAt.query.ahMigrator.dmpDataMessageCounts>(), + const [pendingUpwardMessages] = await Promise.all([ apiAt.query.parachainSystem.pendingUpwardMessages>(), ]); - await this.handleAhDmpDataMessageCounts(dmpDataMessageCounts, item); await this.handleAhPendingUpwardMessages(pendingUpwardMessages, item); // TODO: Query Asset Hub specific storage: // - ahMigrator.ahMigrationStage (Do we actually need this?) @@ -768,9 +782,8 @@ export class BlockProcessor { item: QueueItem ): Promise { try { - const [migrationStage, dmpMessageCount, dmpMessageQueue] = await Promise.all([ + const [migrationStage, dmpMessageQueue] = await Promise.all([ apiAt.query.rcMigrator.rcMigrationStage(), - apiAt.query.rcMigrator.dmpDataMessageCounts>(), apiAt.query.dmp.downwardMessageQueues>( 1000 ), @@ -778,7 +791,6 @@ export class BlockProcessor { // TODO: Is there specific ordering to this or can we put it in a Promise.all? await this.handleRcMigrationStage(migrationStage, item); - await this.handleRcDmpDataMessageCounts(dmpMessageCount, item); await this.handleRcDownwardMessageQueues(dmpMessageQueue, item); Log.service({ @@ -796,56 +808,6 @@ export class BlockProcessor { } } - private async handleAhDmpDataMessageCounts( - dmpDataMessageCounts: ITuple<[u32, u32]>, - item: QueueItem - ) { - try { - const [, erroredOnAh] = dmpDataMessageCounts; - await db - .update(xcmMessageCounters) - .set({ - messagesFailed: erroredOnAh.toNumber(), - lastUpdated: new Date(item.timestamp!), - }) - .where(eq(xcmMessageCounters.sourceChain, 'asset-hub')); - - const counterAh = await db.query.xcmMessageCounters.findFirst({ - where: (counters, { eq }) => eq(counters.sourceChain, 'asset-hub'), - }); - - if (counterAh) { - const eventData = { - sourceChain: counterAh.sourceChain, - destinationChain: counterAh.destinationChain, - messagesSent: counterAh.messagesSent, - messagesProcessed: counterAh.messagesProcessed, - messagesFailed: counterAh.messagesFailed, - lastUpdated: counterAh.lastUpdated, - }; - - Log.service({ - service: 'XCM Message Counter', - action: 'Emitting ahXcmMessageCounter event via storage', - details: eventData, - }); - eventService.emit('ahXcmMessageCounter', eventData); - } else { - Log.service({ - service: 'XCM Message Counter', - action: 'No counter found after storage update', - details: { sourceChain: 'asset-hub' }, - }); - } - } catch (error) { - Log.chainEvent({ - chain: 'asset-hub', - eventType: 'XCM message processing error', - error: error as Error, - }); - } - } - private async handleAhPendingUpwardMessages(pendingUpwardMessages: Vec, item: QueueItem) { try { let totalSizeBytes = 0; @@ -884,7 +846,6 @@ export class BlockProcessor { dmpMessageQueue: Vec, item: QueueItem ) { - const dmpLatencyProcessor = DmpLatencyProcessor.getInstance(); try { const currentQueueSize = dmpMessageQueue.length; // Calculate exact total size in bytes by summing encoded lengths @@ -911,11 +872,6 @@ export class BlockProcessor { timestamp, }); - // Add fill events to latency processor - if (eventType === 'fill') { - dmpLatencyProcessor.addFillMessageSent(timestamp); - } - // Emit event for frontend eventService.emit('dmpQueueEvent', { queueSize: currentQueueSize, @@ -945,98 +901,6 @@ export class BlockProcessor { } } - private async handleRcDmpDataMessageCounts(dmpMessageCount: ITuple<[u32, u32]>, item: QueueItem) { - try { - const [sentToAh, processedOnAh] = dmpMessageCount; - - await db - .update(xcmMessageCounters) - .set({ - messagesSent: sentToAh.toNumber(), - lastUpdated: new Date(item.timestamp!), - }) - .where(eq(xcmMessageCounters.sourceChain, 'relay-chain')); - - await db - .update(xcmMessageCounters) - .set({ - messagesProcessed: processedOnAh.toNumber(), - lastUpdated: new Date(item.timestamp!), - }) - .where(eq(xcmMessageCounters.sourceChain, 'asset-hub')); - - // Get the updated counter - const counterRc = await db.query.xcmMessageCounters.findFirst({ - where: (counters, { eq }) => eq(counters.sourceChain, 'relay-chain'), - }); - - // Get the updated counter - const counterAh = await db.query.xcmMessageCounters.findFirst({ - where: (counters, { eq }) => eq(counters.sourceChain, 'asset-hub'), - }); - - if (counterRc) { - const eventData = { - sourceChain: counterRc.sourceChain, - destinationChain: counterRc.destinationChain, - messagesSent: counterRc.messagesSent, - messagesProcessed: counterRc.messagesProcessed, - messagesFailed: counterRc.messagesFailed, - lastUpdated: counterRc.lastUpdated, - }; - - Log.service({ - service: 'XCM Message Counter', - action: 'Emitting rcXcmMessageCounter event', - details: eventData, - }); - eventService.emit('rcXcmMessageCounter', eventData); - } else { - Log.service({ - service: 'XCM Message Counter', - action: 'No RC counter found after update', - details: { sourceChain: 'relay-chain' }, - }); - } - - if (counterAh) { - const eventData = { - sourceChain: counterAh.sourceChain, - destinationChain: counterAh.destinationChain, - messagesSent: counterAh.messagesSent, - messagesProcessed: counterAh.messagesProcessed, - messagesFailed: counterAh.messagesFailed, - lastUpdated: counterAh.lastUpdated, - }; - - Log.service({ - service: 'XCM Message Counter', - action: 'Emitting ahXcmMessageCounter event', - details: eventData, - }); - eventService.emit('ahXcmMessageCounter', eventData); - } else { - Log.service({ - service: 'XCM Message Counter', - action: 'No AH counter found after update', - details: { sourceChain: 'asset-hub' }, - }); - } - - Log.service({ - service: 'XCM Message Counter', - action: 'Updated counters', - details: { sentToAh, processedOnAh }, - }); - } catch (error) { - Log.chainEvent({ - chain: 'relay-chain', - eventType: 'XCM message processing error', - error: error as Error, - }); - } - } - private async handleRcMigrationStage( migrationStage: PalletRcMigratorMigrationStage, item: QueueItem diff --git a/backend/src/services/cache/DmpLatencyProcessor.ts b/backend/src/services/cache/DmpLatencyProcessor.ts deleted file mode 100644 index 0b51730..0000000 --- a/backend/src/services/cache/DmpLatencyProcessor.ts +++ /dev/null @@ -1,134 +0,0 @@ -import { Log } from '../../logging/Log'; -import { eventService } from '../eventService'; - -import { DmpMetricsCache } from './DmpMetricsCache'; - -interface DmpEvent { - timestamp: Date; -} - -export class DmpLatencyProcessor { - private static instance: DmpLatencyProcessor; - private fillMessageStack: DmpEvent[] = []; - private messageQueueStack: DmpEvent[] = []; - private readonly maxStackSize = 10; // Keep last 10 events in each stack - private readonly defaultLatencyMs = 200; // 0.2 seconds default for race conditions - - private constructor() {} - - public static getInstance(): DmpLatencyProcessor { - if (!DmpLatencyProcessor.instance) { - DmpLatencyProcessor.instance = new DmpLatencyProcessor(); - } - return DmpLatencyProcessor.instance; - } - - // Add fill message sent event - public addFillMessageSent(timestamp: Date): void { - this.fillMessageStack.push({ timestamp }); - // Keep only the last maxStackSize events - if (this.fillMessageStack.length > this.maxStackSize) { - this.fillMessageStack.shift(); - } - - // Clean up old fill events when new ones arrive - this.cleanupOldFillEvents(); - - // Try to process any pending message queue events - this.processPendingEvents(); - } - - // Add message queue processed event - public addMessageQueueProcessed(timestamp: Date): void { - this.messageQueueStack.push({ timestamp }); - - // Keep only the last maxStackSize events - if (this.messageQueueStack.length > this.maxStackSize) { - this.messageQueueStack.shift(); - } - - // Try to process any pending fill message events - this.processPendingEvents(); - } - - private processPendingEvents(): void { - // Wait until both stacks have at least 1 value - if (this.fillMessageStack.length < 1 || this.messageQueueStack.length < 1) { - return; - } - - const firstFillEvent = this.fillMessageStack[0]; - const firstQueueEvent = this.messageQueueStack[0]; - - // Calculate latency - const latencyMs = firstQueueEvent.timestamp.getTime() - firstFillEvent.timestamp.getTime(); - - // Handle negative latency (queue event happened before fill event) - if (latencyMs < 0) { - Log.service({ - service: 'DMP Latency Processor', - action: 'Negative latency detected, clearing messageQueue stack', - details: { - latencyMs, - fillEventTimestamp: firstFillEvent.timestamp.toISOString(), - queueEventTimestamp: firstQueueEvent.timestamp.toISOString(), - }, - }); - // Clear the messageQueue stack and wait for the next messageQueue processed event - this.messageQueueStack.shift(); - return; - } else { - this.emitLatency(latencyMs, firstQueueEvent.timestamp); - } - - // Remove the matched events - this.fillMessageStack.shift(); - this.messageQueueStack.shift(); - } - - // Clean up old fill events when new ones arrive - // This handles the case where multiple fills happen before any processing - private cleanupOldFillEvents(): void { - // If we have multiple fill events, keep only the latest one - // This is because a single messageQueue processed event might consume multiple fill events - if (this.fillMessageStack.length > 1) { - const oldCount = this.fillMessageStack.length - 1; - const latestFillEvent = this.fillMessageStack[this.fillMessageStack.length - 1]; - this.fillMessageStack = [latestFillEvent]; - - Log.service({ - service: 'DMP Latency Processor', - action: 'Cleaned up old fill events', - details: { - oldCount, - latestTimestamp: latestFillEvent.timestamp.toISOString(), - }, - }); - } - } - - private emitLatency(latencyMs: number, timestamp: Date): void { - const dmpMetricsCacheInstance = DmpMetricsCache.getInstance(); - dmpMetricsCacheInstance.updateAverageLatency(latencyMs); - - eventService.emit('dmpLatency', { - latencyMs, - averageLatencyMs: dmpMetricsCacheInstance.getAverageLatencyMs(), - timestamp: timestamp.toISOString(), - }); - } - - // Get current stack sizes for debugging - public getStackSizes(): { fill: number; queue: number } { - return { - fill: this.fillMessageStack.length, - queue: this.messageQueueStack.length, - }; - } - - // Clear stacks (useful for testing or reset) - public clearStacks(): void { - this.fillMessageStack = []; - this.messageQueueStack = []; - } -} diff --git a/backend/src/services/cache/DmpMetricsCache.ts b/backend/src/services/cache/DmpMetricsCache.ts deleted file mode 100644 index 379d05f..0000000 --- a/backend/src/services/cache/DmpMetricsCache.ts +++ /dev/null @@ -1,66 +0,0 @@ -export class DmpMetricsCache { - private static instance: DmpMetricsCache; - private averageLatencyMs: number = 0; - private totalSizeBytes: number = 0; - private lastUpdated: Date = new Date(); - - // Track counters and running sums for averaging - private latencyCount: number = 0; - private latencySum: number = 0; - private sizeCount: number = 0; - private sizeSum: number = 0; - - private constructor() {} - - // Singleton pattern - get the single instance - public static getInstance(): DmpMetricsCache { - if (!DmpMetricsCache.instance) { - DmpMetricsCache.instance = new DmpMetricsCache(); - } - return DmpMetricsCache.instance; - } - - // Get current values - getAverageLatencyMs(): number { - return this.averageLatencyMs; - } - - getTotalSizeBytes(): number { - return this.totalSizeBytes; - } - - getLastUpdated(): Date { - return this.lastUpdated; - } - - // Update average latency with new entry - updateAverageLatency(newLatencyMs: number): void { - this.latencySum += newLatencyMs; - this.latencyCount++; - - // Calculate new average - this.averageLatencyMs = this.latencySum / this.latencyCount; - this.lastUpdated = new Date(); - } - - // Update average total size with new entry - updateTotalSize(newTotalSizeBytes: number): void { - this.sizeSum += newTotalSizeBytes; - this.sizeCount++; - - // Calculate new average - this.totalSizeBytes = this.sizeSum / this.sizeCount; - this.lastUpdated = new Date(); - } - - // Get all metrics as an object - getMetrics() { - return { - averageLatencyMs: this.averageLatencyMs, - totalSizeBytes: this.totalSizeBytes, - lastUpdated: this.lastUpdated, - latencyCount: this.latencyCount, - sizeCount: this.sizeCount, - }; - } -} diff --git a/backend/src/services/cache/UmpLatencyProcessor.ts b/backend/src/services/cache/UmpLatencyProcessor.ts deleted file mode 100644 index 755d061..0000000 --- a/backend/src/services/cache/UmpLatencyProcessor.ts +++ /dev/null @@ -1,113 +0,0 @@ -import { eventService } from '../eventService'; - -import { UmpMetricsCache } from './UmpMetricsCache'; - -interface UmpEvent { - timestamp: Date; -} - -export class UmpLatencyProcessor { - private static instance: UmpLatencyProcessor; - private upwardMessageStack: UmpEvent[] = []; - private messageQueueStack: UmpEvent[] = []; - private readonly maxStackSize = 10; // Keep last 10 events in each stack - private readonly defaultLatencyMs = 200; // 0.2 seconds default for race conditions - - private constructor() {} - - public static getInstance(): UmpLatencyProcessor { - if (!UmpLatencyProcessor.instance) { - UmpLatencyProcessor.instance = new UmpLatencyProcessor(); - } - return UmpLatencyProcessor.instance; - } - - // Add upward message sent event - public addUpwardMessageSent(timestamp: Date): void { - this.upwardMessageStack.push({ timestamp }); - // Keep only the last maxStackSize events - if (this.upwardMessageStack.length > this.maxStackSize) { - this.upwardMessageStack.shift(); - } - - // Try to process any pending message queue events - this.processPendingEvents(); - } - - // Add message queue processed event - public addMessageQueueProcessed(timestamp: Date): void { - this.messageQueueStack.push({ timestamp }); - - // Keep only the last maxStackSize events - if (this.messageQueueStack.length > this.maxStackSize) { - this.messageQueueStack.shift(); - } - - // Try to process any pending upward message events - this.processPendingEvents(); - } - - private processPendingEvents(): void { - // Wait until both stacks have at least 2 values - if (this.upwardMessageStack.length < 2 || this.messageQueueStack.length < 2) { - return; - } - - // Get the first upward event - const firstUpwardEvent = this.upwardMessageStack[0]; - - // Check if any message processed event has the same timestamp as the first upward event - let matchedIndex = -1; - for (let i = 0; i < this.messageQueueStack.length; i++) { - if (this.messageQueueStack[i].timestamp.getTime() === firstUpwardEvent.timestamp.getTime()) { - matchedIndex = i; - break; - } - } - - if (matchedIndex !== -1) { - // Found a timestamp match, calculate latency - const matchedQueueEvent = this.messageQueueStack[matchedIndex]; - const latencyMs = - matchedQueueEvent.timestamp.getTime() - firstUpwardEvent.timestamp.getTime(); - this.emitLatency(latencyMs, matchedQueueEvent.timestamp); - - // Remove the matched events - this.upwardMessageStack.shift(); - this.messageQueueStack.splice(matchedIndex, 1); - } else { - // No exact timestamp match found, use default latency for the first queue event - const firstQueueEvent = this.messageQueueStack[0]; - this.emitLatency(this.defaultLatencyMs, firstQueueEvent.timestamp); - - // Remove both the first queue event and the first upward message since no match was found - this.messageQueueStack.shift(); - this.upwardMessageStack.shift(); - } - } - - private emitLatency(latencyMs: number, timestamp: Date): void { - const umpMetricsCacheInstance = UmpMetricsCache.getInstance(); - umpMetricsCacheInstance.updateAverageLatency(latencyMs); - - eventService.emit('umpLatency', { - latencyMs, - averageLatencyMs: umpMetricsCacheInstance.getAverageLatencyMs(), - timestamp: timestamp.toISOString(), - }); - } - - // Get current stack sizes for debugging - public getStackSizes(): { upward: number; queue: number } { - return { - upward: this.upwardMessageStack.length, - queue: this.messageQueueStack.length, - }; - } - - // Clear stacks (useful for testing or reset) - public clearStacks(): void { - this.upwardMessageStack = []; - this.messageQueueStack = []; - } -} diff --git a/backend/src/services/cache/UmpMetricsCache.ts b/backend/src/services/cache/UmpMetricsCache.ts deleted file mode 100644 index f8d7bb5..0000000 --- a/backend/src/services/cache/UmpMetricsCache.ts +++ /dev/null @@ -1,66 +0,0 @@ -export class UmpMetricsCache { - private static instance: UmpMetricsCache; - private averageLatencyMs: number = 0; - private totalSizeBytes: number = 0; - private lastUpdated: Date = new Date(); - - // Track counters and running sums for averaging - private latencyCount: number = 0; - private latencySum: number = 0; - private sizeCount: number = 0; - private sizeSum: number = 0; - - private constructor() {} - - // Singleton pattern - get the single instance - public static getInstance(): UmpMetricsCache { - if (!UmpMetricsCache.instance) { - UmpMetricsCache.instance = new UmpMetricsCache(); - } - return UmpMetricsCache.instance; - } - - // Get current values - getAverageLatencyMs(): number { - return this.averageLatencyMs; - } - - getTotalSizeBytes(): number { - return this.totalSizeBytes; - } - - getLastUpdated(): Date { - return this.lastUpdated; - } - - // Update average latency with new entry - updateAverageLatency(newLatencyMs: number): void { - this.latencySum += newLatencyMs; - this.latencyCount++; - - // Calculate new average - this.averageLatencyMs = this.latencySum / this.latencyCount; - this.lastUpdated = new Date(); - } - - // Update average total size with new entry - updateTotalSize(newTotalSizeBytes: number): void { - this.sizeSum += newTotalSizeBytes; - this.sizeCount++; - - // Calculate new average - this.totalSizeBytes = this.sizeSum / this.sizeCount; - this.lastUpdated = new Date(); - } - - // Get all metrics as an object - getMetrics() { - return { - averageLatencyMs: this.averageLatencyMs, - totalSizeBytes: this.totalSizeBytes, - lastUpdated: this.lastUpdated, - latencyCount: this.latencyCount, - sizeCount: this.sizeCount, - }; - } -}