Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 1 addition & 20 deletions backend/src/db/initializeDb.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Log } from '../logging/Log';
import { TimeInStageCache } from '../services/cache/TimeInStageCache';

import { xcmMessageCounters, dmpMetricsCache } from './schema';
import { xcmMessageCounters } from './schema';

import { db } from './index';

Expand All @@ -25,7 +25,6 @@ export async function initializeDb() {
await db.insert(xcmMessageCounters).values({
sourceChain: 'relay-chain',
destinationChain: 'asset-hub',
messagesSent: 0,
messagesProcessed: 0,
messagesFailed: 0,
lastUpdated: new Date(),
Expand All @@ -42,30 +41,12 @@ export async function initializeDb() {
await db.insert(xcmMessageCounters).values({
sourceChain: 'asset-hub',
destinationChain: 'relay-chain',
messagesSent: 0,
messagesProcessed: 0,
messagesFailed: 0,
lastUpdated: new Date(),
});
}

// Initialize DMP metrics cache if it doesn't exist
const existingDmpCache = await db.query.dmpMetricsCache.findFirst();
if (!existingDmpCache) {
Log.service({
service: 'Database Initialization',
action: 'Initializing DMP metrics cache',
});
await db.insert(dmpMetricsCache).values({
currentQueueSize: 0,
currentQueueSizeBytes: 0,
averageLatencyMs: 0,
averageThroughput: 0,
averageThroughputBytes: 0,
lastUpdated: new Date(),
});
}

// Initialize TimeInStageCache
Log.service({
service: 'Database Initialization',
Expand Down
59 changes: 0 additions & 59 deletions backend/src/db/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ export const xcmMessageCounters = sqliteTable('xcm_message_counters', {
id: integer('id').primaryKey({ autoIncrement: true }),
sourceChain: text('source_chain').notNull(),
destinationChain: text('destination_chain').notNull(),
messagesSent: integer('messages_sent').notNull().default(0),
messagesProcessed: integer('messages_processed').notNull().default(0),
messagesFailed: integer('messages_failed').notNull().default(0),
lastUpdated: integer('last_updated', { mode: 'timestamp' })
Expand Down Expand Up @@ -84,66 +83,8 @@ export const umpQueueEvents = sqliteTable('ump_queue_events', {
timestamp: integer('timestamp', { mode: 'timestamp' }).default(sql`CURRENT_TIMESTAMP`),
});

// Message Processing Events (from Asset Hub)
export const messageProcessingEventsRC = sqliteTable('message_processing_events_rc', {
id: integer('id').primaryKey({ autoIncrement: true }),
timestamp: integer('timestamp', { mode: 'timestamp' }).default(sql`CURRENT_TIMESTAMP`),
});

// Message Processing Events (from Asset Hub)
export const messageProcessingEventsAH = sqliteTable('message_processing_events', {
id: integer('id').primaryKey({ autoIncrement: true }),
timestamp: integer('timestamp', { mode: 'timestamp' }).default(sql`CURRENT_TIMESTAMP`),
});

// Upward Message Sent Events (from Asset Hub)
export const upwardMessageSentEvents = sqliteTable('upward_message_sent_events', {
id: integer('id').primaryKey({ autoIncrement: true }),
timestamp: integer('timestamp', { mode: 'timestamp' }).default(sql`CURRENT_TIMESTAMP`),
});

// Queue-Processing Correlation
export const queueProcessingCorrelation = sqliteTable('queue_processing_correlation', {
id: integer('id').primaryKey({ autoIncrement: true }),
queueEventId: integer('queue_event_id')
.notNull()
.references(() => dmpQueueEvents.id),
processingEventId: integer('processing_event_id')
.notNull()
.references(() => messageProcessingEventsAH.id),
latencyMs: integer('latency_ms').notNull(), // Time between queue drain and processing
throughput: integer('throughput').notNull(), // Messages per second
throughputBytes: integer('throughput_bytes').notNull(), // Bytes per second
timestamp: integer('timestamp', { mode: 'timestamp' }).default(sql`CURRENT_TIMESTAMP`),
});

// Cached DMP Metrics for quick frontend queries
export const dmpMetricsCache = sqliteTable('dmp_metrics_cache', {
id: integer('id').primaryKey({ autoIncrement: true }),
currentQueueSize: integer('current_queue_size').notNull().default(0),
currentQueueSizeBytes: integer('current_queue_size_bytes').notNull().default(0),
averageLatencyMs: integer('average_latency_ms').notNull().default(0),
averageThroughput: integer('average_throughput').notNull().default(0),
averageThroughputBytes: integer('average_throughput_bytes').notNull().default(0),
lastUpdated: integer('last_updated', { mode: 'timestamp' })
.notNull()
.$defaultFn(() => new Date()),
});

export type DmpQueueEvent = typeof dmpQueueEvents.$inferSelect;
export type NewDmpQueueEvent = typeof dmpQueueEvents.$inferInsert;

export type UmpQueueEvent = typeof umpQueueEvents.$inferSelect;
export type NewUmpQueueEvent = typeof umpQueueEvents.$inferInsert;

export type MessageProcessingEventAH = typeof messageProcessingEventsAH.$inferSelect;
export type NewMessageProcessingEvent = typeof messageProcessingEventsAH.$inferInsert;

export type QueueProcessingCorrelation = typeof queueProcessingCorrelation.$inferSelect;
export type NewQueueProcessingCorrelation = typeof queueProcessingCorrelation.$inferInsert;

export type DmpMetricsCache = typeof dmpMetricsCache.$inferSelect;
export type NewDmpMetricsCache = typeof dmpMetricsCache.$inferInsert;

export type UpwardMessageSentEvent = typeof upwardMessageSentEvents.$inferSelect;
export type NewUpwardMessageSentEvent = typeof upwardMessageSentEvents.$inferInsert;
32 changes: 0 additions & 32 deletions backend/src/routes/updates.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@ import { Request, Response, RequestHandler } from 'express';
import { db } from '../db';
import { migrationStages, xcmMessageCounters } from '../db/schema';
import { Log } from '../logging/Log';
import { DmpMetricsCache } from '../services/cache/DmpMetricsCache';
import { PalletMigrationCache } from '../services/cache/PalletMigrationCache';
import { TimeInStageCache } from '../services/cache/TimeInStageCache';
import { UmpMetricsCache } from '../services/cache/UmpMetricsCache';
import { eventService } from '../services/eventService';
import { getPalletFromStage } from '../util/stageToPalletMapping';

Expand Down Expand Up @@ -77,7 +75,6 @@ export const updatesHandler: RequestHandler = async (req: Request, res: Response
sendEvent('rcXcmMessageCounter', {
sourceChain: rcCounter.sourceChain,
destinationChain: rcCounter.destinationChain,
messagesSent: rcCounter.messagesSent,
messagesProcessed: rcCounter.messagesProcessed,
messagesFailed: rcCounter.messagesFailed,
lastUpdated: rcCounter.lastUpdated,
Expand All @@ -93,7 +90,6 @@ export const updatesHandler: RequestHandler = async (req: Request, res: Response
sendEvent('ahXcmMessageCounter', {
sourceChain: ahCounter.sourceChain,
destinationChain: ahCounter.destinationChain,
messagesSent: ahCounter.messagesSent,
messagesProcessed: ahCounter.messagesProcessed,
messagesFailed: ahCounter.messagesFailed,
lastUpdated: ahCounter.lastUpdated,
Expand Down Expand Up @@ -145,34 +141,6 @@ export const updatesHandler: RequestHandler = async (req: Request, res: Response
}
}

// Handle DMP metrics initial state
if (requestedEvents.includes('dmpMetrics')) {
const dmpMetricsCacheInstance = DmpMetricsCache.getInstance();
const currentMetrics = dmpMetricsCacheInstance.getMetrics();
sendEvent('dmpMetrics', {
averageLatencyMs: currentMetrics.averageLatencyMs,
totalSizeBytes: currentMetrics.totalSizeBytes,
lastUpdated: currentMetrics.lastUpdated,
latencyCount: currentMetrics.latencyCount,
sizeCount: currentMetrics.sizeCount,
timestamp: new Date().toISOString(),
});
}

// Handle UMP metrics initial state
if (requestedEvents.includes('umpMetrics')) {
const umpMetricsCacheInstance = UmpMetricsCache.getInstance();
const currentMetrics = umpMetricsCacheInstance.getMetrics();
sendEvent('umpMetrics', {
averageLatencyMs: currentMetrics.averageLatencyMs,
totalSizeBytes: currentMetrics.totalSizeBytes,
lastUpdated: currentMetrics.lastUpdated,
latencyCount: currentMetrics.latencyCount,
sizeCount: currentMetrics.sizeCount,
timestamp: new Date().toISOString(),
});
}

// Handle pallet migration summary initial state
if (requestedEvents.includes('palletMigrationSummary')) {
const palletMigrationCacheInstance = PalletMigrationCache.getInstance();
Expand Down
Loading