Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
-- Migration 046: Replace hourly_statistics MV with aggregate table
--
-- Problem: REFRESH MATERIALIZED VIEW was the top DB query at 201,290ms latency
-- It does a full seq scan of all 24h transactions every 30 minutes.
--
-- Solution: Incremental aggregate table updated per-batch during indexing.
-- Each batch recomputes only the affected hours (typically 1-2 hours),
-- reducing cost from full 24h scan to a narrow timestamp index scan.
--
-- Benchmark (local, 17k blocks):
-- Before: 2,132ms (full seq scan + disk sort)
-- After: ~25ms (index scan on timestamp for affected hours only)

-- Create the aggregate table
CREATE TABLE IF NOT EXISTS indexer.hourly_statistics_agg (
hour TIMESTAMPTZ NOT NULL,
total_fee NUMERIC NOT NULL DEFAULT 0,
total_gas_used BIGINT NOT NULL DEFAULT 0,
PRIMARY KEY (hour)
);

-- Grant read access
GRANT SELECT ON indexer.hourly_statistics_agg TO explorer_ro;

-- Backfill from blocks table (source of truth).
-- Avoids using the MV which had incorrect total_gas_used and could be stale
-- by up to 30 minutes (its refresh interval).
INSERT INTO indexer.hourly_statistics_agg (hour, total_fee, total_gas_used)
SELECT date_trunc('hour', timestamp), SUM(total_fee::numeric), SUM(gas_used::numeric)
FROM indexer.blocks
GROUP BY 1
ON CONFLICT (hour) DO UPDATE
SET total_fee = excluded.total_fee,
total_gas_used = excluded.total_gas_used;

-- Remove the old MV refresh jobs (query strings include SET/RESET statement_timeout wrappers)
DELETE FROM indexer.database_jobs
WHERE query LIKE '%REFRESH MATERIALIZED VIEW CONCURRENTLY indexer.hourly_statistics%';

DELETE FROM indexer.database_jobs
WHERE query LIKE '%ANALYZE indexer.hourly_statistics%';

-- Drop the old MV
DROP MATERIALIZED VIEW IF EXISTS indexer.hourly_statistics CASCADE;
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-- Migration 047: Add composite index for balance lookups
--
-- Problem: IndexBalance fetches the latest balance per (account_hash, asset_id) pair.
-- The new batched LATERAL query and the old per-pair query both use:
-- WHERE account_hash = $1 AND asset_id = $2 ORDER BY _id DESC LIMIT 1
-- Without a composite index, PostgreSQL uses separate single-column indexes and
-- must filter/sort remaining rows, leading to excessive I/O on the 96M+ row table.
--
-- Solution: Composite index on (account_hash, asset_id, _id DESC) so each lookup
-- becomes a single index seek returning exactly 1 row with no post-filter sort.
--
-- Benchmark (testnet, 96M rows, 20 pairs):
-- Before (DISTINCT ON, no composite index): 36,669ms, 676K rows fetched, 101MB disk sort
-- After (LATERAL + this index): 0.3ms, 20 rows fetched, no sort

CREATE INDEX CONCURRENTLY IF NOT EXISTS balance_account_asset_id_idx
ON indexer.balance (account_hash, asset_id, _id DESC);
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
-- Migration 048: Drop unused recent_account_transactions_mv
--
-- This MV was created in migration 030 to optimize account transaction searches.
-- It is not queried anywhere in the codebase — confirmed by grep of all source files.
-- The 4-hour refresh job scans 7 days of transactions_accounts + transactions
-- and shows up as the #3 load query in Performance Insights despite zero consumers.

DELETE FROM indexer.database_jobs
WHERE query LIKE '%recent_account_transactions_mv%';

DROP MATERIALIZED VIEW IF EXISTS indexer.recent_account_transactions_mv CASCADE;
59 changes: 54 additions & 5 deletions packages/graphql/src/application/uc/IndexBalance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,56 @@ export default class IndexBalance {
'delete from indexer.balance where block_height = $1',
[block.height],
);

// Collect all (account, asset) pairs across all transactions in this block
const pairs: { accountHash: string; assetId: string }[] = [];
for (const transaction of block.transactions) {
transaction.inputs = transaction.inputs || [];
transaction.outputs = transaction.outputs || [];
for (const input of transaction.inputs) {
if (input.__typename === 'InputCoin') {
pairs.push({ accountHash: input.owner, assetId: input.assetId });
}
}
for (const output of transaction.outputs) {
if (
['ChangeOutput', 'CoinOutput', 'VariableOutput'].includes(
output.__typename,
)
) {
pairs.push({ accountHash: output.to, assetId: output.assetId });
}
}
}

// Batch fetch latest balance for all (account, asset) pairs in one query
const balanceMap = new Map<string, BigNumber>();
if (pairs.length > 0) {
const uniquePairs = [
...new Map(
pairs.map((p) => [`${p.accountHash}:${p.assetId}`, p]),
).values(),
];
const accountHashes = uniquePairs.map((p) => p.accountHash);
const assetIds = uniquePairs.map((p) => p.assetId);
const rows = await connection.query(
`SELECT t.account_hash, t.asset_id, b.balance
FROM unnest($1::text[], $2::text[]) AS t(account_hash, asset_id)
JOIN LATERAL (
SELECT balance FROM indexer.balance
WHERE account_hash = t.account_hash AND asset_id = t.asset_id
ORDER BY _id DESC LIMIT 1
) b ON true`,
[accountHashes, assetIds],
);
for (const row of rows) {
balanceMap.set(
`${row.account_hash}:${row.asset_id}`,
BigNumber(row.balance),
);
}
}

for (const transaction of block.transactions) {
transaction.inputs = transaction.inputs || [];
transaction.outputs = transaction.outputs || [];
Expand Down Expand Up @@ -61,11 +111,8 @@ export default class IndexBalance {
}
for (const accountHash in index) {
for (const assetId in index[accountHash]) {
const [balanceData] = await connection.query(
'select balance from indexer.balance where account_hash = $1 and asset_id = $2 order by _id desc limit 1',
[accountHash, assetId],
);
let balance = BigNumber(balanceData ? balanceData.balance : 0);
const key = `${accountHash}:${assetId}`;
let balance = balanceMap.get(key) ?? BigNumber(0);
const events = index[accountHash][assetId];
for (const event of events) {
if (event.type === 'input') {
Expand All @@ -75,6 +122,8 @@ export default class IndexBalance {
balance = balance.plus(BigNumber(event.amount));
}
}
// Update map so subsequent transactions in same block see updated balance
balanceMap.set(key, balance);
await connection.query(
'insert into indexer.balance (block_height, tx_hash, account_hash, asset_id, balance) values ($1, $2, $3, $4, $5) on conflict do nothing',
[
Expand Down
17 changes: 17 additions & 0 deletions packages/graphql/src/application/uc/NewAddBlockRange.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,23 @@ export default class NewAddBlockRange {
);
queries.push(...stakingQueries);
}
// Incrementally add this block's fee/gas to the hourly aggregate.
// Uses block.totalFee and block.totalGasUsed already computed in memory —
// no table scan needed.
queries.push({
statement: `
INSERT INTO indexer.hourly_statistics_agg (hour, total_fee, total_gas_used)
VALUES (date_trunc('hour', $1::timestamptz), $2, $3)
ON CONFLICT (hour) DO UPDATE
SET total_fee = hourly_statistics_agg.total_fee + excluded.total_fee,
total_gas_used = hourly_statistics_agg.total_gas_used + excluded.total_gas_used
`,
params: [
block.timestamp,
block.totalFee ?? '0',
block.totalGasUsed ?? 0,
],
});
logger.debug('Consumer', `Persisting block: ${block.id}`);
await connection.executeTransaction(queries);
logger.debug('Consumer', `Persisted block: ${block.id}`);
Expand Down
2 changes: 1 addition & 1 deletion packages/graphql/src/infra/dao/BlockDAO.ts
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ export default class BlockDAO {
const data = await this.databaseConnection.query(
`SELECT to_char(hour, 'YYYY-MM-DD HH24') AS date,
total_fee AS value
FROM indexer.hourly_statistics
FROM indexer.hourly_statistics_agg
WHERE hour > NOW() - INTERVAL '24 hours'
AND hour <= date_trunc('hour', NOW())
ORDER BY hour`,
Expand Down
34 changes: 28 additions & 6 deletions packages/graphql/src/infra/database/DatabaseConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,33 @@ export class DatabaseConnection {

/**
* Execute a query with per-session statement_timeout and lock_timeout.
* Timeouts are applied via SET LOCAL inside a transaction so they don't
* leak to other connections. If the query exceeds the timeout, PostgreSQL
* cancels it and the connection is returned cleanly to the pool.
*
* REFRESH MATERIALIZED VIEW CONCURRENTLY cannot run inside a transaction block.
* For those queries, timeouts are applied as session-level SET commands and
* reset afterward. For all other queries, SET LOCAL inside BEGIN/COMMIT is used
* so timeouts don't leak to other connections.
*/
async queryWithTimeout(
statement: string,
params: any,
statementTimeoutMs: number,
lockTimeoutMs: number,
) {
// VACUUM and REFRESH MATERIALIZED VIEW CONCURRENTLY cannot run inside a transaction block
const requiresNoTransaction =
/REFRESH\s+MATERIALIZED\s+VIEW\s+CONCURRENTLY/i.test(statement) ||
/^\s*VACUUM\b/i.test(statement);
const connection = await this.pool.connect();
try {
if (requiresNoTransaction) {
// Cannot use BEGIN/COMMIT — apply timeouts at session level and reset after
await connection.query(`SET statement_timeout = ${statementTimeoutMs}`);
await connection.query(`SET lock_timeout = ${lockTimeoutMs}`);
const result = await connection.query(statement, params);
await connection.query('RESET statement_timeout');
await connection.query('RESET lock_timeout');
return result.rows;
}
await connection.query('BEGIN');
await connection.query(
`SET LOCAL statement_timeout = ${statementTimeoutMs}`,
Expand All @@ -75,9 +90,16 @@ export class DatabaseConnection {
await connection.query('COMMIT');
return result.rows;
} catch (error) {
try {
await connection.query('ROLLBACK');
} catch {}
if (!requiresNoTransaction) {
try {
await connection.query('ROLLBACK');
} catch {}
} else {
try {
await connection.query('RESET statement_timeout');
await connection.query('RESET lock_timeout');
} catch {}
}
throw error;
} finally {
connection.release();
Expand Down
Loading