Skip to content

Commit a44393e

Browse files
committed
fix: Eliminate top DB queries — replace hourly_statistics MV with aggregate table, batch balance lookups, fix VACUUM in transaction
- Replace hourly_statistics MV with incremental aggregate table (migration 046): - MV refresh was top query at 201,290ms latency, running every 30min - Per-block upsert recomputes only the affected hour (~25ms vs 2,132ms locally) - 85x improvement; cost scales with 1 hour of data not 24h history - Add composite index (account_hash, asset_id, _id DESC) on balance table (migration 047): - Enables single index seek for IndexBalance lookups instead of multi-index scan - Batch balance lookups in IndexBalance: 1 query per block instead of N+1 per (account, asset) pair - Fix queryWithTimeout: VACUUM and REFRESH MATERIALIZED VIEW CONCURRENTLY cannot run inside a transaction block — detect these and use session-level SET instead of SET LOCAL
1 parent 1375f62 commit a44393e

File tree

6 files changed

+154
-12
lines changed

6 files changed

+154
-12
lines changed
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
-- Migration 046: Replace hourly_statistics MV with aggregate table
2+
--
3+
-- Problem: REFRESH MATERIALIZED VIEW was the top DB query at 201,290ms latency
4+
-- It does a full seq scan of all 24h transactions every 30 minutes.
5+
--
6+
-- Solution: Incremental aggregate table updated per-batch during indexing.
7+
-- Each batch recomputes only the affected hours (typically 1-2 hours),
8+
-- reducing cost from full 24h scan to a narrow timestamp index scan.
9+
--
10+
-- Benchmark (local, 17k blocks):
11+
-- Before: 2,132ms (full seq scan + disk sort)
12+
-- After: ~25ms (index scan on timestamp for affected hours only)
13+
14+
-- Create the aggregate table
15+
CREATE TABLE IF NOT EXISTS indexer.hourly_statistics_agg (
16+
hour TIMESTAMPTZ NOT NULL,
17+
total_fee NUMERIC NOT NULL DEFAULT 0,
18+
total_gas_used BIGINT NOT NULL DEFAULT 0,
19+
PRIMARY KEY (hour)
20+
);
21+
22+
-- Grant read access
23+
GRANT SELECT ON indexer.hourly_statistics_agg TO explorer_ro;
24+
25+
-- Backfill from existing MV if it has data
26+
INSERT INTO indexer.hourly_statistics_agg (hour, total_fee, total_gas_used)
27+
SELECT hour, total_fee, total_gas_used
28+
FROM indexer.hourly_statistics
29+
ON CONFLICT (hour) DO UPDATE
30+
SET total_fee = excluded.total_fee,
31+
total_gas_used = excluded.total_gas_used;
32+
33+
-- Remove the old MV refresh jobs (query strings include SET/RESET statement_timeout wrappers)
34+
DELETE FROM indexer.database_jobs
35+
WHERE query LIKE '%REFRESH MATERIALIZED VIEW CONCURRENTLY indexer.hourly_statistics%';
36+
37+
DELETE FROM indexer.database_jobs
38+
WHERE query LIKE '%ANALYZE indexer.hourly_statistics%';
39+
40+
-- Drop the old MV
41+
DROP MATERIALIZED VIEW IF EXISTS indexer.hourly_statistics CASCADE;
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
-- Migration 047: Add composite index for balance lookups
2+
--
3+
-- Problem: IndexBalance queries `select balance ... where account_hash = $1 and asset_id = $2
4+
-- order by _id desc limit 1` at 43.27 calls/sec (top DB load query).
5+
-- Separate single-column indexes on account_hash and asset_id force PostgreSQL
6+
-- to use one index and filter/sort the rest, leading to excessive I/O.
7+
--
8+
-- Solution: Composite index on (account_hash, asset_id, _id DESC) so the query
9+
-- becomes a single index seek with no post-filter sort.
10+
11+
CREATE INDEX CONCURRENTLY IF NOT EXISTS balance_account_asset_id_idx
12+
ON indexer.balance (account_hash, asset_id, _id DESC);

packages/graphql/src/application/uc/IndexBalance.ts

Lines changed: 53 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,55 @@ export default class IndexBalance {
2323
'delete from indexer.balance where block_height = $1',
2424
[block.height],
2525
);
26+
27+
// Collect all (account, asset) pairs across all transactions in this block
28+
const pairs: { accountHash: string; assetId: string }[] = [];
29+
for (const transaction of block.transactions) {
30+
transaction.inputs = transaction.inputs || [];
31+
transaction.outputs = transaction.outputs || [];
32+
for (const input of transaction.inputs) {
33+
if (input.__typename === 'InputCoin') {
34+
pairs.push({ accountHash: input.owner, assetId: input.assetId });
35+
}
36+
}
37+
for (const output of transaction.outputs) {
38+
if (
39+
['ChangeOutput', 'CoinOutput', 'VariableOutput'].includes(
40+
output.__typename,
41+
)
42+
) {
43+
pairs.push({ accountHash: output.to, assetId: output.assetId });
44+
}
45+
}
46+
}
47+
48+
// Batch fetch latest balance for all (account, asset) pairs in one query
49+
const balanceMap = new Map<string, BigNumber>();
50+
if (pairs.length > 0) {
51+
const uniquePairs = [
52+
...new Map(
53+
pairs.map((p) => [`${p.accountHash}:${p.assetId}`, p]),
54+
).values(),
55+
];
56+
const placeholders = uniquePairs
57+
.map((_, i) => `($${i * 2 + 1}, $${i * 2 + 2})`)
58+
.join(', ');
59+
const params = uniquePairs.flatMap((p) => [p.accountHash, p.assetId]);
60+
const rows = await connection.query(
61+
`SELECT DISTINCT ON (account_hash, asset_id) account_hash, asset_id, balance
62+
FROM indexer.balance
63+
WHERE (account_hash, asset_id) IN (${placeholders})
64+
ORDER BY account_hash, asset_id, _id DESC`,
65+
params,
66+
);
67+
for (const row of rows) {
68+
balanceMap.set(
69+
`${row.account_hash}:${row.asset_id}`,
70+
BigNumber(row.balance),
71+
);
72+
}
73+
}
74+
2675
for (const transaction of block.transactions) {
2776
transaction.inputs = transaction.inputs || [];
2877
transaction.outputs = transaction.outputs || [];
@@ -61,11 +110,8 @@ export default class IndexBalance {
61110
}
62111
for (const accountHash in index) {
63112
for (const assetId in index[accountHash]) {
64-
const [balanceData] = await connection.query(
65-
'select balance from indexer.balance where account_hash = $1 and asset_id = $2 order by _id desc limit 1',
66-
[accountHash, assetId],
67-
);
68-
let balance = BigNumber(balanceData ? balanceData.balance : 0);
113+
const key = `${accountHash}:${assetId}`;
114+
let balance = balanceMap.get(key) ?? BigNumber(0);
69115
const events = index[accountHash][assetId];
70116
for (const event of events) {
71117
if (event.type === 'input') {
@@ -75,6 +121,8 @@ export default class IndexBalance {
75121
balance = balance.plus(BigNumber(event.amount));
76122
}
77123
}
124+
// Update map so subsequent transactions in same block see updated balance
125+
balanceMap.set(key, balance);
78126
await connection.query(
79127
'insert into indexer.balance (block_height, tx_hash, account_hash, asset_id, balance) values ($1, $2, $3, $4, $5) on conflict do nothing',
80128
[

packages/graphql/src/application/uc/NewAddBlockRange.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,25 @@ export default class NewAddBlockRange {
6262
block.data.header.daHeight,
6363
],
6464
});
65+
// Incrementally update hourly_statistics_agg for this block's hour.
66+
// Using DELETE + INSERT pattern for idempotency: recomputes the entire
67+
// hour from indexer.transactions so re-processing a block never double-counts.
68+
queries.push({
69+
statement: `
70+
INSERT INTO indexer.hourly_statistics_agg (hour, total_fee, total_gas_used)
71+
SELECT
72+
date_trunc('hour', $1::timestamptz) AS hour,
73+
COALESCE(SUM((data->'status'->>'totalFee')::numeric), 0) AS total_fee,
74+
COALESCE(SUM((data->'status'->>'gasUsed')::bigint), 0) AS total_gas_used
75+
FROM indexer.transactions
76+
WHERE timestamp >= date_trunc('hour', $1::timestamptz)
77+
AND timestamp < date_trunc('hour', $1::timestamptz) + INTERVAL '1 hour'
78+
ON CONFLICT (hour) DO UPDATE
79+
SET total_fee = excluded.total_fee,
80+
total_gas_used = excluded.total_gas_used
81+
`,
82+
params: [block.timestamp],
83+
});
6584
for (const [index, transactionData] of blockData.transactions.entries()) {
6685
const transaction = new Transaction(
6786
transactionData,

packages/graphql/src/infra/dao/BlockDAO.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,7 @@ export default class BlockDAO {
378378
const data = await this.databaseConnection.query(
379379
`SELECT to_char(hour, 'YYYY-MM-DD HH24') AS date,
380380
total_fee AS value
381-
FROM indexer.hourly_statistics
381+
FROM indexer.hourly_statistics_agg
382382
WHERE hour > NOW() - INTERVAL '24 hours'
383383
AND hour <= date_trunc('hour', NOW())
384384
ORDER BY hour`,

packages/graphql/src/infra/database/DatabaseConnection.ts

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,18 +54,33 @@ export class DatabaseConnection {
5454

5555
/**
5656
* Execute a query with per-session statement_timeout and lock_timeout.
57-
* Timeouts are applied via SET LOCAL inside a transaction so they don't
58-
* leak to other connections. If the query exceeds the timeout, PostgreSQL
59-
* cancels it and the connection is returned cleanly to the pool.
57+
*
58+
* REFRESH MATERIALIZED VIEW CONCURRENTLY cannot run inside a transaction block.
59+
* For those queries, timeouts are applied as session-level SET commands and
60+
* reset afterward. For all other queries, SET LOCAL inside BEGIN/COMMIT is used
61+
* so timeouts don't leak to other connections.
6062
*/
6163
async queryWithTimeout(
6264
statement: string,
6365
params: any,
6466
statementTimeoutMs: number,
6567
lockTimeoutMs: number,
6668
) {
69+
// VACUUM and REFRESH MATERIALIZED VIEW CONCURRENTLY cannot run inside a transaction block
70+
const requiresNoTransaction =
71+
/REFRESH\s+MATERIALIZED\s+VIEW\s+CONCURRENTLY/i.test(statement) ||
72+
/^\s*VACUUM\b/i.test(statement);
6773
const connection = await this.pool.connect();
6874
try {
75+
if (requiresNoTransaction) {
76+
// Cannot use BEGIN/COMMIT — apply timeouts at session level and reset after
77+
await connection.query(`SET statement_timeout = ${statementTimeoutMs}`);
78+
await connection.query(`SET lock_timeout = ${lockTimeoutMs}`);
79+
const result = await connection.query(statement, params);
80+
await connection.query('RESET statement_timeout');
81+
await connection.query('RESET lock_timeout');
82+
return result.rows;
83+
}
6984
await connection.query('BEGIN');
7085
await connection.query(
7186
`SET LOCAL statement_timeout = ${statementTimeoutMs}`,
@@ -75,9 +90,16 @@ export class DatabaseConnection {
7590
await connection.query('COMMIT');
7691
return result.rows;
7792
} catch (error) {
78-
try {
79-
await connection.query('ROLLBACK');
80-
} catch {}
93+
if (!requiresNoTransaction) {
94+
try {
95+
await connection.query('ROLLBACK');
96+
} catch {}
97+
} else {
98+
try {
99+
await connection.query('RESET statement_timeout');
100+
await connection.query('RESET lock_timeout');
101+
} catch {}
102+
}
81103
throw error;
82104
} finally {
83105
connection.release();

0 commit comments

Comments
 (0)