Skip to content

Fix/event replay perf #1572

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docker/docker-compose.dev.postgres.yml
Original file line number Diff line number Diff line change
@@ -9,3 +9,7 @@ services:
POSTGRES_PASSWORD: postgres
POSTGRES_DB: stacks_blockchain_api
POSTGRES_PORT: 5432
command: >
-c work_mem=256MB
-c maintenance_work_mem=256MB
-c max_wal_size=1GB
98 changes: 14 additions & 84 deletions src/datastore/event-requests.ts
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@ import { connectPostgres, PgServer } from './connection';
import { connectPgPool, connectWithRetry } from './connection-legacy';
import * as pgCopyStreams from 'pg-copy-streams';
import * as PgCursor from 'pg-cursor';
import * as readline from 'readline';

export async function exportRawEventRequests(targetStream: Writable): Promise<void> {
const pool = await connectPgPool({
@@ -28,94 +29,23 @@ export async function exportRawEventRequests(targetStream: Writable): Promise<vo
}

export async function* getRawEventRequests(
readStream: Readable,
onStatusUpdate?: (msg: string) => void
): AsyncGenerator<DbRawEventRequest[], void, unknown> {
// 1. Pipe input stream into a temp table
// 2. Use `pg-cursor` to async read rows from temp table (order by `id` ASC)
// 3. Drop temp table
// 4. Close db connection
const pool = await connectPgPool({
usageName: 'get-raw-events',
pgServer: PgServer.primary,
readStream: Readable
): AsyncGenerator<DbRawEventRequest, void, unknown> {
const rl = readline.createInterface({
input: readStream,
crlfDelay: Infinity,
});
try {
const client = await pool.connect();
try {
await client.query('BEGIN');
await client.query(`
CREATE TEMPORARY TABLE temp_event_observer_requests(
id bigint PRIMARY KEY,
receive_timestamp timestamptz NOT NULL,
event_path text NOT NULL,
payload jsonb NOT NULL
) ON COMMIT DROP
`);
// Use a `temp_raw_tsv` table first to store the raw TSV data as it might come with duplicate
// rows which would trigger the `PRIMARY KEY` constraint in `temp_event_observer_requests`.
// We will "upsert" from the former to the latter before event ingestion.
await client.query(`
CREATE TEMPORARY TABLE temp_raw_tsv
(LIKE temp_event_observer_requests)
ON COMMIT DROP
`);
onStatusUpdate?.('Importing raw event requests into temporary table...');
const importStream = client.query(pgCopyStreams.from(`COPY temp_raw_tsv FROM STDIN`));
await pipelineAsync(readStream, importStream);
onStatusUpdate?.('Removing any duplicate raw event requests...');
await client.query(`
INSERT INTO temp_event_observer_requests
SELECT *
FROM temp_raw_tsv
ON CONFLICT DO NOTHING;
`);
const totallengthQuery = await client.query<{ count: string }>(
`SELECT COUNT(id) count FROM temp_event_observer_requests`
);
const totallength = parseInt(totallengthQuery.rows[0].count);
let lastStatusUpdatePercent = 0;
onStatusUpdate?.('Streaming raw event requests from temporary table...');
const cursor = new PgCursor<{ id: string; event_path: string; payload: string }>(
`
SELECT id, event_path, payload::text
FROM temp_event_observer_requests
ORDER BY id ASC
`
);
const cursorQuery = client.query(cursor);
const rowBatchSize = 100;
let rowsReadCount = 0;
let rows: DbRawEventRequest[] = [];
do {
rows = await new Promise<DbRawEventRequest[]>((resolve, reject) => {
cursorQuery.read(rowBatchSize, (error, rows) => {
if (error) {
reject(error);
} else {
rowsReadCount += rows.length;
if ((rowsReadCount / totallength) * 100 > lastStatusUpdatePercent + 1) {
lastStatusUpdatePercent = Math.floor((rowsReadCount / totallength) * 100);
onStatusUpdate?.(
`Raw event requests processed: ${lastStatusUpdatePercent}% (${rowsReadCount} / ${totallength})`
);
}
resolve(rows);
}
});
});
if (rows.length > 0) {
yield rows;
}
} while (rows.length > 0);
await client.query('COMMIT');
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
for await (const line of rl) {
const columns = line.split('\t');
const rawRequest: DbRawEventRequest = {
event_path: columns[2],
payload: columns[3],
};
yield rawRequest;
}
} finally {
await pool.end();
rl.close();
}
}

48 changes: 27 additions & 21 deletions src/event-replay/event-replay.ts
Original file line number Diff line number Diff line change
@@ -134,38 +134,44 @@ export async function importEventsFromTsv(

// Import TSV chain data
const readStream = fs.createReadStream(resolvedFilePath);
const rawEventsIterator = getRawEventRequests(readStream, status => {
console.log(status);
});
const rawEventsIterator = getRawEventRequests(readStream);
// Set logger to only output for warnings/errors, otherwise the event replay will result
// in the equivalent of months/years of API log output.
logger.level = 'warn';
// The current import block height. Will be updated with every `/new_block` event.
let blockHeight = 0;
let lastStatusUpdatePercent = 0;
const responses = [];
for await (const rawEvents of rawEventsIterator) {
for (const rawEvent of rawEvents) {
if (eventImportMode === EventImportMode.pruned) {
if (blockHeight === prunedBlockHeight) {
console.log(`Resuming prunable event import...`);
}
for await (const rawEvent of rawEventsIterator) {
if (eventImportMode === EventImportMode.pruned) {
if (blockHeight === prunedBlockHeight) {
console.log(`Resuming prunable event import...`);
}
const response = await httpPostRequest({
host: '127.0.0.1',
port: eventServer.serverAddress.port,
path: rawEvent.event_path,
headers: { 'Content-Type': 'application/json' },
body: Buffer.from(rawEvent.payload, 'utf8'),
throwOnNotOK: true,
});
if (rawEvent.event_path === '/new_block') {
blockHeight = await getDbBlockHeight(db);
if (blockHeight && blockHeight % 1000 === 0) {
}
const response = await httpPostRequest({
host: '127.0.0.1',
port: eventServer.serverAddress.port,
path: rawEvent.event_path,
headers: { 'Content-Type': 'application/json' },
body: Buffer.from(rawEvent.payload, 'utf8'),
throwOnNotOK: true,
});
if (rawEvent.event_path === '/new_block') {
blockHeight = await getDbBlockHeight(db);
if (blockHeight) {
if (blockHeight % 1000 === 0) {
console.log(`Event file block height reached: ${blockHeight}`);
}
const percentProgress = (blockHeight / tsvBlockHeight) * 100;
if (percentProgress > lastStatusUpdatePercent + 1) {
lastStatusUpdatePercent = Math.floor(percentProgress);
console.log(
`Blocks processed: ${lastStatusUpdatePercent}% (${blockHeight} / ${tsvBlockHeight})`
);
}
}
responses.push(response);
}
responses.push(response);
}
await db.finishEventReplay();
console.log(`Event import and playback successful.`);
68 changes: 32 additions & 36 deletions src/tests-event-replay/import-export-tests.ts
Original file line number Diff line number Diff line change
@@ -170,21 +170,19 @@ describe('IBD', () => {
return [eventServer, eventServer.closeAsync] as const;
},
async (rawEventsIterator, eventServer) => {
for await (const rawEvents of rawEventsIterator) {
for (const rawEvent of rawEvents) {
ibdRoutesVisited.add(rawEvent.event_path);
const response = await httpPostRequest({
host: '127.0.0.1',
port: eventServer.serverAddress.port,
path: rawEvent.event_path,
headers: { 'Content-Type': 'application/json' },
body: Buffer.from(rawEvent.payload, 'utf8'),
throwOnNotOK: true,
});
if (ibdRoutes.includes(rawEvent.event_path)) {
expect(response.statusCode).toBe(200);
expect(response.response).toBe('IBD mode active.');
}
for await (const rawEvent of rawEventsIterator) {
ibdRoutesVisited.add(rawEvent.event_path);
const response = await httpPostRequest({
host: '127.0.0.1',
port: eventServer.serverAddress.port,
path: rawEvent.event_path,
headers: { 'Content-Type': 'application/json' },
body: Buffer.from(rawEvent.payload, 'utf8'),
throwOnNotOK: true,
});
if (ibdRoutes.includes(rawEvent.event_path)) {
expect(response.statusCode).toBe(200);
expect(response.response).toBe('IBD mode active.');
}
}
}
@@ -214,27 +212,25 @@ describe('IBD', () => {
return [eventServer, eventServer.closeAsync] as const;
},
async (rawEventsIterator, eventServer) => {
for await (const rawEvents of rawEventsIterator) {
for (const rawEvent of rawEvents) {
ibdRoutesVisited.add(rawEvent.event_path);
const response = await httpPostRequest({
host: '127.0.0.1',
port: eventServer.serverAddress.port,
path: rawEvent.event_path,
headers: { 'Content-Type': 'application/json' },
body: Buffer.from(rawEvent.payload, 'utf8'),
throwOnNotOK: true,
});
if (ibdRoutes.includes(rawEvent.event_path)) {
const chainTip = await db.getChainTip(client, false);
const ibdThreshold = Number.parseInt(process.env.IBD_MODE_UNTIL_BLOCK as string);
if (chainTip.blockHeight < ibdThreshold) {
expect(response.statusCode).toBe(200);
expect(response.response).toBe('IBD mode active.');
} else {
expect(response.statusCode).toBe(200);
expect(response.response).not.toBe('IBD mode active.');
}
for await (const rawEvent of rawEventsIterator) {
ibdRoutesVisited.add(rawEvent.event_path);
const response = await httpPostRequest({
host: '127.0.0.1',
port: eventServer.serverAddress.port,
path: rawEvent.event_path,
headers: { 'Content-Type': 'application/json' },
body: Buffer.from(rawEvent.payload, 'utf8'),
throwOnNotOK: true,
});
if (ibdRoutes.includes(rawEvent.event_path)) {
const chainTip = await db.getChainTip(client, false);
const ibdThreshold = Number.parseInt(process.env.IBD_MODE_UNTIL_BLOCK as string);
if (chainTip.blockHeight < ibdThreshold) {
expect(response.statusCode).toBe(200);
expect(response.response).toBe('IBD mode active.');
} else {
expect(response.statusCode).toBe(200);
expect(response.response).not.toBe('IBD mode active.');
}
}
}
44 changes: 21 additions & 23 deletions src/tests-event-replay/raw-event-request-tests.ts
Original file line number Diff line number Diff line change
@@ -55,31 +55,29 @@ describe('Events table', () => {
return [eventServer, eventServer.closeAsync] as const;
},
async (rawEventsIterator, eventServer) => {
for await (const rawEvents of rawEventsIterator) {
for (const rawEvent of rawEvents) {
try {
if (rawEvent.event_path === '/new_block') {
const payloadJson = JSON.parse(rawEvent.payload);
payloadJson.transactions = undefined;
rawEvent.payload = JSON.stringify(payloadJson);
}
} catch (error) {}
const rawEventRequestCountResultBefore = await getRawEventCount();
const rawEventRequestCountBefore = rawEventRequestCountResultBefore[0];
const response = await httpPostRequest({
host: '127.0.0.1',
port: eventServer.serverAddress.port,
path: rawEvent.event_path,
headers: { 'Content-Type': 'application/json' },
body: Buffer.from(rawEvent.payload, 'utf8'),
throwOnNotOK: false,
});
for await (const rawEvent of rawEventsIterator) {
try {
if (rawEvent.event_path === '/new_block') {
expect(response.statusCode).toBe(500);
const rawEventRequestCountResultAfter = await getRawEventCount();
const rawEventRequestCountAfter = rawEventRequestCountResultAfter[0];
expect(rawEventRequestCountBefore).toEqual(rawEventRequestCountAfter);
const payloadJson = JSON.parse(rawEvent.payload);
payloadJson.transactions = undefined;
rawEvent.payload = JSON.stringify(payloadJson);
}
} catch (error) {}
const rawEventRequestCountResultBefore = await getRawEventCount();
const rawEventRequestCountBefore = rawEventRequestCountResultBefore[0];
const response = await httpPostRequest({
host: '127.0.0.1',
port: eventServer.serverAddress.port,
path: rawEvent.event_path,
headers: { 'Content-Type': 'application/json' },
body: Buffer.from(rawEvent.payload, 'utf8'),
throwOnNotOK: false,
});
if (rawEvent.event_path === '/new_block') {
expect(response.statusCode).toBe(500);
const rawEventRequestCountResultAfter = await getRawEventCount();
const rawEventRequestCountAfter = rawEventRequestCountResultAfter[0];
expect(rawEventRequestCountBefore).toEqual(rawEventRequestCountAfter);
}
}
}
60 changes: 27 additions & 33 deletions src/tests/microblock-tests.ts
Original file line number Diff line number Diff line change
@@ -91,17 +91,15 @@ describe('microblock tests', () => {
return [apiServer, apiServer.terminate] as const;
},
async (_, rawEventsIterator, eventServer, api) => {
for await (const rawEvents of rawEventsIterator) {
for (const rawEvent of rawEvents) {
await httpPostRequest({
host: '127.0.0.1',
port: eventServer.serverAddress.port,
path: rawEvent.event_path,
headers: { 'Content-Type': 'application/json' },
body: Buffer.from(rawEvent.payload, 'utf8'),
throwOnNotOK: true,
});
}
for await (const rawEvent of rawEventsIterator) {
await httpPostRequest({
host: '127.0.0.1',
port: eventServer.serverAddress.port,
path: rawEvent.event_path,
headers: { 'Content-Type': 'application/json' },
body: Buffer.from(rawEvent.payload, 'utf8'),
throwOnNotOK: true,
});
}
// test that the out-of-order microblocks were not stored
const mbHash1 = '0xb714e75a7dae26fee0e77788317a0c84e513d1d8647a376b21b1c864e55c135a';
@@ -153,17 +151,15 @@ describe('microblock tests', () => {
return [apiServer, apiServer.terminate] as const;
},
async (_, rawEventsIterator, eventServer, api) => {
for await (const rawEvents of rawEventsIterator) {
for (const rawEvent of rawEvents) {
await httpPostRequest({
host: '127.0.0.1',
port: eventServer.serverAddress.port,
path: rawEvent.event_path,
headers: { 'Content-Type': 'application/json' },
body: Buffer.from(rawEvent.payload, 'utf8'),
throwOnNotOK: true,
});
}
for await (const rawEvent of rawEventsIterator) {
await httpPostRequest({
host: '127.0.0.1',
port: eventServer.serverAddress.port,
path: rawEvent.event_path,
headers: { 'Content-Type': 'application/json' },
body: Buffer.from(rawEvent.payload, 'utf8'),
throwOnNotOK: true,
});
}
const txResult2 = await supertest(api.server).get(`/extended/v1/tx/${lostTx}`);
const { body: txBody }: { body: Transaction } = txResult2;
@@ -219,17 +215,15 @@ describe('microblock tests', () => {
return [apiServer, apiServer.terminate] as const;
},
async (_, rawEventsIterator, eventServer, api) => {
for await (const rawEvents of rawEventsIterator) {
for (const rawEvent of rawEvents) {
await httpPostRequest({
host: '127.0.0.1',
port: eventServer.serverAddress.port,
path: rawEvent.event_path,
headers: { 'Content-Type': 'application/json' },
body: Buffer.from(rawEvent.payload, 'utf8'),
throwOnNotOK: true,
});
}
for await (const rawEvent of rawEventsIterator) {
await httpPostRequest({
host: '127.0.0.1',
port: eventServer.serverAddress.port,
path: rawEvent.event_path,
headers: { 'Content-Type': 'application/json' },
body: Buffer.from(rawEvent.payload, 'utf8'),
throwOnNotOK: true,
});
}
const txResult2 = await supertest(api.server).get(`/extended/v1/tx/${lostTx}`);
const { body: txBody }: { body: Transaction } = txResult2;