Skip to content

Commit a6cf1f1

Browse files
committed
fix: wrap mempool tx inserts in sql transactions, along with a few other queries
1 parent 0a9d690 commit a6cf1f1

File tree

8 files changed

+189
-139
lines changed

8 files changed

+189
-139
lines changed

src/datastore/common.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,7 @@ export interface DataStore extends DataStoreEventEmitter {
316316
}): Promise<FoundOrNot<DbSmartContractEvent[]>>;
317317

318318
update(data: DataStoreUpdateData): Promise<void>;
319-
updateMempoolTx(args: { mempoolTx: DbMempoolTx }): Promise<void>;
319+
updateMempoolTxs(args: { mempoolTxs: DbMempoolTx[] }): Promise<void>;
320320

321321
updateBurnchainRewards(args: {
322322
burnchainBlockHash: string;

src/datastore/memory-store.ts

+5-3
Original file line numberDiff line numberDiff line change
@@ -189,9 +189,11 @@ export class MemoryDataStore extends (EventEmitter as { new (): DataStoreEventEm
189189
return Promise.resolve();
190190
}
191191

192-
updateMempoolTx({ mempoolTx: tx }: { mempoolTx: DbMempoolTx }): Promise<void> {
193-
this.txMempool.set(tx.tx_id, tx);
194-
this.emit('txUpdate', tx);
192+
updateMempoolTxs({ mempoolTxs: txs }: { mempoolTxs: DbMempoolTx[] }): Promise<void> {
193+
txs.forEach(tx => {
194+
this.txMempool.set(tx.tx_id, tx);
195+
this.emit('txUpdate', tx);
196+
});
195197
return Promise.resolve();
196198
}
197199

src/datastore/postgres-store.ts

+169-122
Original file line numberDiff line numberDiff line change
@@ -1007,25 +1007,34 @@ export class PgDataStore extends (EventEmitter as { new (): DataStoreEventEmitte
10071007
}
10081008

10091009
async getBlocks({ limit, offset }: { limit: number; offset: number }) {
1010-
const totalQuery = this.pool.query<{ count: number }>(`
1011-
SELECT COUNT(*)::integer
1012-
FROM blocks
1013-
WHERE canonical = true
1014-
`);
1015-
const resultQuery = this.pool.query<BlockQueryResult>(
1016-
`
1017-
SELECT ${BLOCK_COLUMNS}
1018-
FROM blocks
1019-
WHERE canonical = true
1020-
ORDER BY block_height DESC
1021-
LIMIT $1
1022-
OFFSET $2
1023-
`,
1024-
[limit, offset]
1025-
);
1026-
const [total, results] = await Promise.all([totalQuery, resultQuery]);
1027-
const parsed = results.rows.map(r => this.parseBlockQueryResult(r));
1028-
return { results: parsed, total: total.rows[0].count } as const;
1010+
const client = await this.pool.connect();
1011+
try {
1012+
await client.query('BEGIN');
1013+
const total = await client.query<{ count: number }>(`
1014+
SELECT COUNT(*)::integer
1015+
FROM blocks
1016+
WHERE canonical = true
1017+
`);
1018+
const results = await client.query<BlockQueryResult>(
1019+
`
1020+
SELECT ${BLOCK_COLUMNS}
1021+
FROM blocks
1022+
WHERE canonical = true
1023+
ORDER BY block_height DESC
1024+
LIMIT $1
1025+
OFFSET $2
1026+
`,
1027+
[limit, offset]
1028+
);
1029+
await client.query('COMMIT');
1030+
const parsed = results.rows.map(r => this.parseBlockQueryResult(r));
1031+
return { results: parsed, total: total.rows[0].count } as const;
1032+
} catch (e) {
1033+
await client.query('ROLLBACK');
1034+
throw e;
1035+
} finally {
1036+
client.release();
1037+
}
10291038
}
10301039

10311040
async getBlockTxs(indexBlockHash: string) {
@@ -1160,6 +1169,7 @@ export class PgDataStore extends (EventEmitter as { new (): DataStoreEventEmitte
11601169
};
11611170
return parsed;
11621171
});
1172+
await client.query('COMMIT');
11631173
return results;
11641174
} catch (e) {
11651175
await client.query('ROLLBACK');
@@ -1185,6 +1195,7 @@ export class PgDataStore extends (EventEmitter as { new (): DataStoreEventEmitte
11851195
`,
11861196
[burnchainRecipient]
11871197
);
1198+
await client.query('COMMIT');
11881199
const resultAmount = BigInt(queryResults.rows[0]?.amount ?? 0);
11891200
return { reward_recipient: burnchainRecipient, reward_amount: resultAmount };
11901201
} catch (e) {
@@ -1238,45 +1249,62 @@ export class PgDataStore extends (EventEmitter as { new (): DataStoreEventEmitte
12381249
return result.rowCount;
12391250
}
12401251

1241-
async updateMempoolTx({ mempoolTx: tx }: { mempoolTx: DbMempoolTx }): Promise<void> {
1242-
const result = await this.pool.query(
1243-
`
1244-
INSERT INTO mempool_txs(
1245-
${MEMPOOL_TX_COLUMNS}
1246-
) values($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23)
1247-
ON CONFLICT ON CONSTRAINT unique_tx_id
1248-
DO NOTHING
1249-
`,
1250-
[
1251-
tx.pruned,
1252-
hexToBuffer(tx.tx_id),
1253-
tx.raw_tx,
1254-
tx.type_id,
1255-
tx.status,
1256-
tx.receipt_time,
1257-
tx.post_conditions,
1258-
tx.fee_rate,
1259-
tx.sponsored,
1260-
tx.sponsor_address,
1261-
tx.sender_address,
1262-
tx.origin_hash_mode,
1263-
tx.token_transfer_recipient_address,
1264-
tx.token_transfer_amount,
1265-
tx.token_transfer_memo,
1266-
tx.smart_contract_contract_id,
1267-
tx.smart_contract_source_code,
1268-
tx.contract_call_contract_id,
1269-
tx.contract_call_function_name,
1270-
tx.contract_call_function_args,
1271-
tx.poison_microblock_header_1,
1272-
tx.poison_microblock_header_2,
1273-
tx.coinbase_payload,
1274-
]
1275-
);
1276-
if (result.rowCount !== 1) {
1277-
const errMsg = `A duplicate transaction was attempted to be inserted into the mempool_txs table: ${tx.tx_id}`;
1278-
logger.error(errMsg);
1279-
} else {
1252+
async updateMempoolTxs({ mempoolTxs: txs }: { mempoolTxs: DbMempoolTx[] }): Promise<void> {
1253+
const client = await this.pool.connect();
1254+
const updatedTxs: DbMempoolTx[] = [];
1255+
try {
1256+
await client.query('BEGIN');
1257+
for (const tx of txs) {
1258+
const result = await client.query(
1259+
`
1260+
INSERT INTO mempool_txs(
1261+
${MEMPOOL_TX_COLUMNS}
1262+
) values($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23)
1263+
ON CONFLICT ON CONSTRAINT unique_tx_id
1264+
DO NOTHING
1265+
`,
1266+
[
1267+
tx.pruned,
1268+
hexToBuffer(tx.tx_id),
1269+
tx.raw_tx,
1270+
tx.type_id,
1271+
tx.status,
1272+
tx.receipt_time,
1273+
tx.post_conditions,
1274+
tx.fee_rate,
1275+
tx.sponsored,
1276+
tx.sponsor_address,
1277+
tx.sender_address,
1278+
tx.origin_hash_mode,
1279+
tx.token_transfer_recipient_address,
1280+
tx.token_transfer_amount,
1281+
tx.token_transfer_memo,
1282+
tx.smart_contract_contract_id,
1283+
tx.smart_contract_source_code,
1284+
tx.contract_call_contract_id,
1285+
tx.contract_call_function_name,
1286+
tx.contract_call_function_args,
1287+
tx.poison_microblock_header_1,
1288+
tx.poison_microblock_header_2,
1289+
tx.coinbase_payload,
1290+
]
1291+
);
1292+
if (result.rowCount !== 1) {
1293+
const errMsg = `A duplicate transaction was attempted to be inserted into the mempool_txs table: ${tx.tx_id}`;
1294+
logger.error(errMsg);
1295+
} else {
1296+
updatedTxs.push(tx);
1297+
}
1298+
}
1299+
await client.query('COMMIT');
1300+
} catch (e) {
1301+
await client.query('ROLLBACK');
1302+
throw e;
1303+
} finally {
1304+
client.release();
1305+
}
1306+
1307+
for (const tx of updatedTxs) {
12801308
this.emit('txUpdate', tx);
12811309
}
12821310
}
@@ -1402,28 +1430,36 @@ export class PgDataStore extends (EventEmitter as { new (): DataStoreEventEmitte
14021430
limit: number;
14031431
offset: number;
14041432
}): Promise<{ results: DbMempoolTx[]; total: number }> {
1405-
// TODO: wrap the following queries in a transaction
1406-
const totalQuery = await this.pool.query<{ count: number }>(
1407-
`
1408-
SELECT COUNT(*)::integer
1409-
FROM mempool_txs
1410-
WHERE pruned = false
1411-
`
1412-
);
1413-
const resultQuery = await this.pool.query<MempoolTxQueryResult>(
1414-
`
1415-
SELECT ${MEMPOOL_TX_COLUMNS}
1416-
FROM mempool_txs
1417-
WHERE pruned = false
1418-
ORDER BY receipt_time DESC
1419-
LIMIT $1
1420-
OFFSET $2
1421-
`,
1422-
[limit, offset]
1423-
);
1424-
1425-
const parsed = resultQuery.rows.map(r => this.parseMempoolTxQueryResult(r));
1426-
return { results: parsed, total: totalQuery.rows[0].count };
1433+
const client = await this.pool.connect();
1434+
try {
1435+
await client.query('BEGIN');
1436+
const totalQuery = await client.query<{ count: number }>(
1437+
`
1438+
SELECT COUNT(*)::integer
1439+
FROM mempool_txs
1440+
WHERE pruned = false
1441+
`
1442+
);
1443+
const resultQuery = await client.query<MempoolTxQueryResult>(
1444+
`
1445+
SELECT ${MEMPOOL_TX_COLUMNS}
1446+
FROM mempool_txs
1447+
WHERE pruned = false
1448+
ORDER BY receipt_time DESC
1449+
LIMIT $1
1450+
OFFSET $2
1451+
`,
1452+
[limit, offset]
1453+
);
1454+
await client.query('COMMIT');
1455+
const parsed = resultQuery.rows.map(r => this.parseMempoolTxQueryResult(r));
1456+
return { results: parsed, total: totalQuery.rows[0].count };
1457+
} catch (e) {
1458+
await client.query('ROLLBACK');
1459+
throw e;
1460+
} finally {
1461+
client.release();
1462+
}
14271463
}
14281464

14291465
async getMempoolTxIdList(): Promise<{ results: DbMempoolTxId[] }> {
@@ -1473,49 +1509,59 @@ export class PgDataStore extends (EventEmitter as { new (): DataStoreEventEmitte
14731509
}) {
14741510
let totalQuery: QueryResult<{ count: number }>;
14751511
let resultQuery: QueryResult<TxQueryResult>;
1476-
if (txTypeFilter.length === 0) {
1477-
totalQuery = await this.pool.query<{ count: number }>(
1478-
`
1479-
SELECT COUNT(*)::integer
1480-
FROM txs
1481-
WHERE canonical = true
1482-
`
1483-
);
1484-
resultQuery = await this.pool.query<TxQueryResult>(
1485-
`
1486-
SELECT ${TX_COLUMNS}
1487-
FROM txs
1488-
WHERE canonical = true
1489-
ORDER BY block_height DESC, tx_index DESC
1490-
LIMIT $1
1491-
OFFSET $2
1492-
`,
1493-
[limit, offset]
1494-
);
1495-
} else {
1496-
const txTypeIds = txTypeFilter.map<number>(t => getTxTypeId(t));
1497-
totalQuery = await this.pool.query<{ count: number }>(
1498-
`
1499-
SELECT COUNT(*)::integer
1500-
FROM txs
1501-
WHERE canonical = true AND type_id = ANY($1)
1502-
`,
1503-
[txTypeIds]
1504-
);
1505-
resultQuery = await this.pool.query<TxQueryResult>(
1506-
`
1507-
SELECT ${TX_COLUMNS}
1508-
FROM txs
1509-
WHERE canonical = true AND type_id = ANY($1)
1510-
ORDER BY block_height DESC, tx_index DESC
1511-
LIMIT $2
1512-
OFFSET $3
1513-
`,
1514-
[txTypeIds, limit, offset]
1515-
);
1512+
const client = await this.pool.connect();
1513+
try {
1514+
await client.query('BEGIN');
1515+
if (txTypeFilter.length === 0) {
1516+
totalQuery = await client.query<{ count: number }>(
1517+
`
1518+
SELECT COUNT(*)::integer
1519+
FROM txs
1520+
WHERE canonical = true
1521+
`
1522+
);
1523+
resultQuery = await client.query<TxQueryResult>(
1524+
`
1525+
SELECT ${TX_COLUMNS}
1526+
FROM txs
1527+
WHERE canonical = true
1528+
ORDER BY block_height DESC, tx_index DESC
1529+
LIMIT $1
1530+
OFFSET $2
1531+
`,
1532+
[limit, offset]
1533+
);
1534+
} else {
1535+
const txTypeIds = txTypeFilter.map<number>(t => getTxTypeId(t));
1536+
totalQuery = await client.query<{ count: number }>(
1537+
`
1538+
SELECT COUNT(*)::integer
1539+
FROM txs
1540+
WHERE canonical = true AND type_id = ANY($1)
1541+
`,
1542+
[txTypeIds]
1543+
);
1544+
resultQuery = await client.query<TxQueryResult>(
1545+
`
1546+
SELECT ${TX_COLUMNS}
1547+
FROM txs
1548+
WHERE canonical = true AND type_id = ANY($1)
1549+
ORDER BY block_height DESC, tx_index DESC
1550+
LIMIT $2
1551+
OFFSET $3
1552+
`,
1553+
[txTypeIds, limit, offset]
1554+
);
1555+
}
1556+
await client.query('COMMIT');
1557+
const parsed = resultQuery.rows.map(r => this.parseTxQueryResult(r));
1558+
return { results: parsed, total: totalQuery.rows[0].count };
1559+
} catch (e) {
1560+
await client.query('ROLLBACK');
1561+
throw e;
1562+
} finally {
1563+
client.release();
15161564
}
1517-
const parsed = resultQuery.rows.map(r => this.parseTxQueryResult(r));
1518-
return { results: parsed, total: totalQuery.rows[0].count };
15191565
}
15201566

15211567
async getTxEvents(txId: string, indexBlockHash: string) {
@@ -1703,6 +1749,7 @@ export class PgDataStore extends (EventEmitter as { new (): DataStoreEventEmitte
17031749
events[rowIndex++] = event;
17041750
}
17051751
events.sort((a, b) => a.event_index - b.event_index);
1752+
await client.query('COMMIT');
17061753
return { results: events };
17071754
} catch (e) {
17081755
await client.query('ROLLBACK');

src/event-stream/event-server.ts

+4-3
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ async function handleMempoolTxsMessage(rawTxs: string[], db: DataStore): Promise
8181
rawTx: buffer,
8282
};
8383
});
84-
for (const tx of decodedTxs) {
84+
const dbMempoolTxs = decodedTxs.map(tx => {
8585
logger.verbose(`Received mempool tx: ${tx.txId}`);
8686
const dbMempoolTx = createDbMempoolTxFromCoreMsg({
8787
txId: tx.txId,
@@ -91,8 +91,9 @@ async function handleMempoolTxsMessage(rawTxs: string[], db: DataStore): Promise
9191
rawTx: tx.rawTx,
9292
receiptDate: receiptDate,
9393
});
94-
await db.updateMempoolTx({ mempoolTx: dbMempoolTx });
95-
}
94+
return dbMempoolTx;
95+
});
96+
await db.updateMempoolTxs({ mempoolTxs: dbMempoolTxs });
9697
}
9798

9899
async function handleClientMessage(msg: CoreNodeBlockMessage, db: DataStore): Promise<void> {

0 commit comments

Comments
 (0)