diff --git a/packages/graphql/database/migrations/046_replace_hourly_statistics_mv_with_agg_table.sql b/packages/graphql/database/migrations/046_replace_hourly_statistics_mv_with_agg_table.sql new file mode 100644 index 00000000..a5b308af --- /dev/null +++ b/packages/graphql/database/migrations/046_replace_hourly_statistics_mv_with_agg_table.sql @@ -0,0 +1,44 @@ +-- Migration 046: Replace hourly_statistics MV with aggregate table +-- +-- Problem: REFRESH MATERIALIZED VIEW was the top DB query at 201,290ms latency +-- It does a full seq scan of all 24h transactions every 30 minutes. +-- +-- Solution: Incremental aggregate table updated per-batch during indexing. +-- Each batch recomputes only the affected hours (typically 1-2 hours), +-- reducing cost from full 24h scan to a narrow timestamp index scan. +-- +-- Benchmark (local, 17k blocks): +-- Before: 2,132ms (full seq scan + disk sort) +-- After: ~25ms (index scan on timestamp for affected hours only) + +-- Create the aggregate table +CREATE TABLE IF NOT EXISTS indexer.hourly_statistics_agg ( + hour TIMESTAMPTZ NOT NULL, + total_fee NUMERIC NOT NULL DEFAULT 0, + total_gas_used BIGINT NOT NULL DEFAULT 0, + PRIMARY KEY (hour) +); + +-- Grant read access +GRANT SELECT ON indexer.hourly_statistics_agg TO explorer_ro; + +-- Backfill from blocks table (source of truth). +-- Avoids using the MV which had incorrect total_gas_used and could be stale +-- by up to 30 minutes (its refresh interval). +INSERT INTO indexer.hourly_statistics_agg (hour, total_fee, total_gas_used) +SELECT date_trunc('hour', timestamp), SUM(total_fee::numeric), SUM(gas_used::numeric) +FROM indexer.blocks +GROUP BY 1 +ON CONFLICT (hour) DO UPDATE + SET total_fee = excluded.total_fee, + total_gas_used = excluded.total_gas_used; + +-- Remove the old MV refresh jobs (query strings include SET/RESET statement_timeout wrappers) +DELETE FROM indexer.database_jobs +WHERE query LIKE '%REFRESH MATERIALIZED VIEW CONCURRENTLY indexer.hourly_statistics%'; + +DELETE FROM indexer.database_jobs +WHERE query LIKE '%ANALYZE indexer.hourly_statistics%'; + +-- Drop the old MV +DROP MATERIALIZED VIEW IF EXISTS indexer.hourly_statistics CASCADE; diff --git a/packages/graphql/database/migrations/047_optimize_balance_index.sql b/packages/graphql/database/migrations/047_optimize_balance_index.sql new file mode 100644 index 00000000..a73ff543 --- /dev/null +++ b/packages/graphql/database/migrations/047_optimize_balance_index.sql @@ -0,0 +1,17 @@ +-- Migration 047: Add composite index for balance lookups +-- +-- Problem: IndexBalance fetches the latest balance per (account_hash, asset_id) pair. +-- The new batched LATERAL query and the old per-pair query both use: +-- WHERE account_hash = $1 AND asset_id = $2 ORDER BY _id DESC LIMIT 1 +-- Without a composite index, PostgreSQL uses separate single-column indexes and +-- must filter/sort remaining rows, leading to excessive I/O on the 96M+ row table. +-- +-- Solution: Composite index on (account_hash, asset_id, _id DESC) so each lookup +-- becomes a single index seek returning exactly 1 row with no post-filter sort. +-- +-- Benchmark (testnet, 96M rows, 20 pairs): +-- Before (DISTINCT ON, no composite index): 36,669ms, 676K rows fetched, 101MB disk sort +-- After (LATERAL + this index): 0.3ms, 20 rows fetched, no sort + +CREATE INDEX CONCURRENTLY IF NOT EXISTS balance_account_asset_id_idx +ON indexer.balance (account_hash, asset_id, _id DESC); diff --git a/packages/graphql/database/migrations/048_drop_unused_recent_account_transactions_mv.sql b/packages/graphql/database/migrations/048_drop_unused_recent_account_transactions_mv.sql new file mode 100644 index 00000000..b9badc41 --- /dev/null +++ b/packages/graphql/database/migrations/048_drop_unused_recent_account_transactions_mv.sql @@ -0,0 +1,11 @@ +-- Migration 048: Drop unused recent_account_transactions_mv +-- +-- This MV was created in migration 030 to optimize account transaction searches. +-- It is not queried anywhere in the codebase — confirmed by grep of all source files. +-- The 4-hour refresh job scans 7 days of transactions_accounts + transactions +-- and shows up as the #3 load query in Performance Insights despite zero consumers. + +DELETE FROM indexer.database_jobs +WHERE query LIKE '%recent_account_transactions_mv%'; + +DROP MATERIALIZED VIEW IF EXISTS indexer.recent_account_transactions_mv CASCADE; diff --git a/packages/graphql/src/application/uc/IndexBalance.ts b/packages/graphql/src/application/uc/IndexBalance.ts index cd37d33d..208b70f9 100644 --- a/packages/graphql/src/application/uc/IndexBalance.ts +++ b/packages/graphql/src/application/uc/IndexBalance.ts @@ -23,6 +23,56 @@ export default class IndexBalance { 'delete from indexer.balance where block_height = $1', [block.height], ); + + // Collect all (account, asset) pairs across all transactions in this block + const pairs: { accountHash: string; assetId: string }[] = []; + for (const transaction of block.transactions) { + transaction.inputs = transaction.inputs || []; + transaction.outputs = transaction.outputs || []; + for (const input of transaction.inputs) { + if (input.__typename === 'InputCoin') { + pairs.push({ accountHash: input.owner, assetId: input.assetId }); + } + } + for (const output of transaction.outputs) { + if ( + ['ChangeOutput', 'CoinOutput', 'VariableOutput'].includes( + output.__typename, + ) + ) { + pairs.push({ accountHash: output.to, assetId: output.assetId }); + } + } + } + + // Batch fetch latest balance for all (account, asset) pairs in one query + const balanceMap = new Map(); + if (pairs.length > 0) { + const uniquePairs = [ + ...new Map( + pairs.map((p) => [`${p.accountHash}:${p.assetId}`, p]), + ).values(), + ]; + const accountHashes = uniquePairs.map((p) => p.accountHash); + const assetIds = uniquePairs.map((p) => p.assetId); + const rows = await connection.query( + `SELECT t.account_hash, t.asset_id, b.balance + FROM unnest($1::text[], $2::text[]) AS t(account_hash, asset_id) + JOIN LATERAL ( + SELECT balance FROM indexer.balance + WHERE account_hash = t.account_hash AND asset_id = t.asset_id + ORDER BY _id DESC LIMIT 1 + ) b ON true`, + [accountHashes, assetIds], + ); + for (const row of rows) { + balanceMap.set( + `${row.account_hash}:${row.asset_id}`, + BigNumber(row.balance), + ); + } + } + for (const transaction of block.transactions) { transaction.inputs = transaction.inputs || []; transaction.outputs = transaction.outputs || []; @@ -61,11 +111,8 @@ export default class IndexBalance { } for (const accountHash in index) { for (const assetId in index[accountHash]) { - const [balanceData] = await connection.query( - 'select balance from indexer.balance where account_hash = $1 and asset_id = $2 order by _id desc limit 1', - [accountHash, assetId], - ); - let balance = BigNumber(balanceData ? balanceData.balance : 0); + const key = `${accountHash}:${assetId}`; + let balance = balanceMap.get(key) ?? BigNumber(0); const events = index[accountHash][assetId]; for (const event of events) { if (event.type === 'input') { @@ -75,6 +122,8 @@ export default class IndexBalance { balance = balance.plus(BigNumber(event.amount)); } } + // Update map so subsequent transactions in same block see updated balance + balanceMap.set(key, balance); await connection.query( 'insert into indexer.balance (block_height, tx_hash, account_hash, asset_id, balance) values ($1, $2, $3, $4, $5) on conflict do nothing', [ diff --git a/packages/graphql/src/application/uc/NewAddBlockRange.ts b/packages/graphql/src/application/uc/NewAddBlockRange.ts index 39b9d7b4..6c062879 100644 --- a/packages/graphql/src/application/uc/NewAddBlockRange.ts +++ b/packages/graphql/src/application/uc/NewAddBlockRange.ts @@ -175,6 +175,23 @@ export default class NewAddBlockRange { ); queries.push(...stakingQueries); } + // Incrementally add this block's fee/gas to the hourly aggregate. + // Uses block.totalFee and block.totalGasUsed already computed in memory — + // no table scan needed. + queries.push({ + statement: ` + INSERT INTO indexer.hourly_statistics_agg (hour, total_fee, total_gas_used) + VALUES (date_trunc('hour', $1::timestamptz), $2, $3) + ON CONFLICT (hour) DO UPDATE + SET total_fee = hourly_statistics_agg.total_fee + excluded.total_fee, + total_gas_used = hourly_statistics_agg.total_gas_used + excluded.total_gas_used + `, + params: [ + block.timestamp, + block.totalFee ?? '0', + block.totalGasUsed ?? 0, + ], + }); logger.debug('Consumer', `Persisting block: ${block.id}`); await connection.executeTransaction(queries); logger.debug('Consumer', `Persisted block: ${block.id}`); diff --git a/packages/graphql/src/infra/dao/BlockDAO.ts b/packages/graphql/src/infra/dao/BlockDAO.ts index 3d8454b4..f0fb40c2 100644 --- a/packages/graphql/src/infra/dao/BlockDAO.ts +++ b/packages/graphql/src/infra/dao/BlockDAO.ts @@ -378,7 +378,7 @@ export default class BlockDAO { const data = await this.databaseConnection.query( `SELECT to_char(hour, 'YYYY-MM-DD HH24') AS date, total_fee AS value - FROM indexer.hourly_statistics + FROM indexer.hourly_statistics_agg WHERE hour > NOW() - INTERVAL '24 hours' AND hour <= date_trunc('hour', NOW()) ORDER BY hour`, diff --git a/packages/graphql/src/infra/database/DatabaseConnection.ts b/packages/graphql/src/infra/database/DatabaseConnection.ts index 70b3d3be..7fd765e6 100644 --- a/packages/graphql/src/infra/database/DatabaseConnection.ts +++ b/packages/graphql/src/infra/database/DatabaseConnection.ts @@ -54,9 +54,11 @@ export class DatabaseConnection { /** * Execute a query with per-session statement_timeout and lock_timeout. - * Timeouts are applied via SET LOCAL inside a transaction so they don't - * leak to other connections. If the query exceeds the timeout, PostgreSQL - * cancels it and the connection is returned cleanly to the pool. + * + * REFRESH MATERIALIZED VIEW CONCURRENTLY cannot run inside a transaction block. + * For those queries, timeouts are applied as session-level SET commands and + * reset afterward. For all other queries, SET LOCAL inside BEGIN/COMMIT is used + * so timeouts don't leak to other connections. */ async queryWithTimeout( statement: string, @@ -64,8 +66,21 @@ export class DatabaseConnection { statementTimeoutMs: number, lockTimeoutMs: number, ) { + // VACUUM and REFRESH MATERIALIZED VIEW CONCURRENTLY cannot run inside a transaction block + const requiresNoTransaction = + /REFRESH\s+MATERIALIZED\s+VIEW\s+CONCURRENTLY/i.test(statement) || + /^\s*VACUUM\b/i.test(statement); const connection = await this.pool.connect(); try { + if (requiresNoTransaction) { + // Cannot use BEGIN/COMMIT — apply timeouts at session level and reset after + await connection.query(`SET statement_timeout = ${statementTimeoutMs}`); + await connection.query(`SET lock_timeout = ${lockTimeoutMs}`); + const result = await connection.query(statement, params); + await connection.query('RESET statement_timeout'); + await connection.query('RESET lock_timeout'); + return result.rows; + } await connection.query('BEGIN'); await connection.query( `SET LOCAL statement_timeout = ${statementTimeoutMs}`, @@ -75,9 +90,16 @@ export class DatabaseConnection { await connection.query('COMMIT'); return result.rows; } catch (error) { - try { - await connection.query('ROLLBACK'); - } catch {} + if (!requiresNoTransaction) { + try { + await connection.query('ROLLBACK'); + } catch {} + } else { + try { + await connection.query('RESET statement_timeout'); + await connection.query('RESET lock_timeout'); + } catch {} + } throw error; } finally { connection.release();