Skip to content

Commit a1f5b12

Browse files
rafaelcrzone117x
andauthored
feat: export events tsv directly to postgres instance (#2048) (#2058)
* fix: export events tsv directly to postgres instance * fix: remove unused function * chore: option to export events to either local file or client * chore: try new docker path * chore: divide remote and local paths * fix: try relative path for mkdir * ci: try chmod * ci: run mkdir first * ci: try with sudo * fix: file paths --------- Co-authored-by: Matthew Little <[email protected]>
1 parent f6e50f6 commit a1f5b12

8 files changed

+89
-64
lines changed

.github/workflows/ci.yml

+3
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,9 @@ jobs:
201201
- name: Setup integration environment
202202
run: |
203203
sudo ufw disable
204+
mkdir -p src/tests-event-replay/.tmp/local/
205+
sudo chown 999:999 src/tests-event-replay/.tmp/local/
206+
sudo chmod -R 777 src/tests-event-replay/.tmp/local/
204207
docker compose -f docker/docker-compose.dev.postgres.yml up -d
205208
npm run devenv:logs -- --no-color &> docker-compose-logs.txt &
206209

docker/docker-compose.dev.bitcoind.yml

-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
version: "3"
21
services:
32
bitcoind:
43
image: "blockstack/bitcoind:v0.20.99.0"
+3-2
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1-
version: '3.7'
21
services:
32
postgres:
4-
image: "postgres:14"
3+
image: "postgres:15"
54
ports:
65
- "5490:5432"
76
environment:
87
POSTGRES_USER: postgres
98
POSTGRES_PASSWORD: postgres
109
POSTGRES_DB: stacks_blockchain_api
1110
POSTGRES_PORT: 5432
11+
volumes:
12+
- ../src/tests-event-replay/.tmp/local/:/root/

docker/docker-compose.dev.stacks-blockchain.yml

-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
version: '3.7'
21
services:
32
stacks-blockchain:
43
image: 'hirosystems/stacks-api-e2e:stacks3.0-0a2c0e2'

src/event-replay/connection-legacy.ts

-22
Original file line numberDiff line numberDiff line change
@@ -150,25 +150,3 @@ function getPgClientConfig<TGetPoolConfig extends boolean = false>({
150150
return clientConfig;
151151
}
152152
}
153-
154-
/**
155-
* Creates a postgres pool client connection. If the connection fails due to a transient error, it is retried until successful.
156-
* You'd expect that the pg lib to handle this, but it doesn't, see https://github.com/brianc/node-postgres/issues/1789
157-
*/
158-
export async function connectWithRetry(pool: Pool): Promise<PoolClient> {
159-
for (let retryAttempts = 1; ; retryAttempts++) {
160-
try {
161-
const client = await pool.connect();
162-
return client;
163-
} catch (error: any) {
164-
// Check for transient errors, and retry after 1 second
165-
const pgConnectionError = isPgConnectionError(error);
166-
if (pgConnectionError) {
167-
logger.warn(`${pgConnectionError}, will retry, attempt #${retryAttempts}`);
168-
await timeout(1000);
169-
} else {
170-
throw error;
171-
}
172-
}
173-
}
174-
}

src/event-replay/event-replay.ts

+16-8
Original file line numberDiff line numberDiff line change
@@ -38,16 +38,24 @@ export async function exportEventsAsTsv(
3838
if (!filePath) {
3939
throw new Error(`A file path should be specified with the --file option`);
4040
}
41-
const resolvedFilePath = path.resolve(filePath);
42-
if (fs.existsSync(resolvedFilePath) && overwriteFile !== true) {
43-
throw new Error(
44-
`A file already exists at ${resolvedFilePath}. Add --overwrite-file to truncate an existing file`
45-
);
41+
const isLocal = filePath.startsWith('local:');
42+
if (isLocal) {
43+
filePath = filePath.replace(/^local:/, '');
44+
if (!path.isAbsolute(filePath)) {
45+
throw new Error(`The file path must be absolute`);
46+
}
47+
} else {
48+
const resolvedFilePath = path.resolve(filePath);
49+
if (fs.existsSync(resolvedFilePath) && overwriteFile !== true) {
50+
throw new Error(
51+
`A file already exists at ${resolvedFilePath}. Add --overwrite-file to truncate an existing file`
52+
);
53+
}
4654
}
47-
console.log(`Export event data to file: ${resolvedFilePath}`);
48-
const writeStream = fs.createWriteStream(resolvedFilePath);
55+
56+
console.log(`Exporting event data to ${filePath}`);
4957
console.log(`Export started...`);
50-
await exportRawEventRequests(writeStream);
58+
await exportRawEventRequests(filePath, isLocal);
5159
console.log('Export successful.');
5260
}
5361

src/event-replay/event-requests.ts

+30-22
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,38 @@
1-
import { pipelineAsync } from '../helpers';
2-
import { Readable, Writable } from 'stream';
1+
import { pipeline } from 'node:stream/promises';
2+
import { Readable } from 'stream';
33
import { DbRawEventRequest } from '../datastore/common';
4-
import { PgServer } from '../datastore/connection';
5-
import { connectPgPool, connectWithRetry } from './connection-legacy';
4+
import { getConnectionArgs, getConnectionConfig, PgServer } from '../datastore/connection';
5+
import { connectPgPool } from './connection-legacy';
66
import * as pgCopyStreams from 'pg-copy-streams';
77
import * as PgCursor from 'pg-cursor';
8+
import { connectPostgres } from '@hirosystems/api-toolkit';
9+
import { createWriteStream } from 'node:fs';
810

9-
export async function exportRawEventRequests(targetStream: Writable): Promise<void> {
10-
const pool = await connectPgPool({
11-
usageName: 'export-raw-events',
12-
pgServer: PgServer.primary,
11+
export async function exportRawEventRequests(filePath: string, local: boolean): Promise<void> {
12+
const sql = await connectPostgres({
13+
usageName: `export-events`,
14+
connectionArgs: getConnectionArgs(PgServer.primary),
15+
connectionConfig: getConnectionConfig(PgServer.primary),
1316
});
14-
const client = await connectWithRetry(pool);
15-
try {
16-
const copyQuery = pgCopyStreams.to(
17-
`
18-
COPY (SELECT id, receive_timestamp, event_path, payload FROM event_observer_requests ORDER BY id ASC)
19-
TO STDOUT ENCODING 'UTF8'
20-
`
21-
);
22-
const queryStream = client.query(copyQuery);
23-
await pipelineAsync(queryStream, targetStream);
24-
} finally {
25-
client.release();
26-
await pool.end();
17+
const copyQuery = sql`
18+
COPY (
19+
SELECT id, receive_timestamp, event_path, payload
20+
FROM event_observer_requests
21+
ORDER BY id ASC
22+
)`;
23+
if (local) {
24+
await sql`${copyQuery}
25+
TO '${sql.unsafe(filePath)}'
26+
WITH (FORMAT TEXT, DELIMITER E'\t', ENCODING 'UTF8')
27+
`;
28+
} else {
29+
const readableStream = await sql`${copyQuery}
30+
TO STDOUT
31+
WITH (FORMAT TEXT, DELIMITER E'\t', ENCODING 'UTF8')
32+
`.readable();
33+
await pipeline(readableStream, createWriteStream(filePath));
2734
}
35+
await sql.end();
2836
}
2937

3038
export async function* getRawEventRequests(
@@ -61,7 +69,7 @@ export async function* getRawEventRequests(
6169
`);
6270
onStatusUpdate?.('Importing raw event requests into temporary table...');
6371
const importStream = client.query(pgCopyStreams.from(`COPY temp_raw_tsv FROM STDIN`));
64-
await pipelineAsync(readStream, importStream);
72+
await pipeline(readStream, importStream);
6573
onStatusUpdate?.('Removing any duplicate raw event requests...');
6674
await client.query(`
6775
INSERT INTO temp_event_observer_requests

src/tests-event-replay/import-export-tests.ts

+37-8
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { ChainID } from '@stacks/transactions';
22
import * as fs from 'fs';
3+
import * as path from 'path';
34
import { getRawEventRequests } from '../event-replay/event-requests';
45
import { PgWriteStore } from '../datastore/pg-write-store';
56
import { exportEventsAsTsv, importEventsFromTsv } from '../event-replay/event-replay';
@@ -25,7 +26,7 @@ describe('import/export tests', () => {
2526
await db?.close();
2627
});
2728

28-
test('event import and export cycle', async () => {
29+
test('event import and export cycle - remote', async () => {
2930
// Import from mocknet TSV
3031
await importEventsFromTsv('src/tests-event-replay/tsv/mocknet.tsv', 'archival', true, true);
3132
const chainTip = await db.getChainTip(db.sql);
@@ -38,14 +39,42 @@ describe('import/export tests', () => {
3839
);
3940

4041
// Export into temp TSV
41-
const tmpDir = 'src/tests-event-replay/.tmp';
42+
const tmpDir = 'src/tests-event-replay/.tmp/remote';
43+
fs.mkdirSync(tmpDir, { recursive: true });
44+
await exportEventsAsTsv(`${tmpDir}/export.tsv`);
45+
46+
// Re-import with exported TSV and check that chain tip matches.
4247
try {
43-
fs.mkdirSync(tmpDir);
44-
} catch (error: any) {
45-
if (error.code != 'EEXIST') throw error;
48+
await importEventsFromTsv(`${tmpDir}/export.tsv`, 'archival', true, true);
49+
const newChainTip = await db.getChainTip(db.sql);
50+
expect(newChainTip.block_height).toBe(28);
51+
expect(newChainTip.index_block_hash).toBe(
52+
'0x76cd67a65c0dfd5ea450bb9efe30da89fa125bfc077c953802f718353283a533'
53+
);
54+
expect(newChainTip.block_hash).toBe(
55+
'0x7682af212d3c1ef62613412f9b5a727269b4548f14eca2e3f941f7ad8b3c11b2'
56+
);
57+
} finally {
58+
fs.rmSync(`${tmpDir}/export.tsv`);
4659
}
47-
const tmpTsvPath = `${tmpDir}/export.tsv`;
48-
await exportEventsAsTsv(tmpTsvPath, true);
60+
});
61+
62+
test('event import and export cycle - local', async () => {
63+
// Import from mocknet TSV
64+
await importEventsFromTsv('src/tests-event-replay/tsv/mocknet.tsv', 'archival', true, true);
65+
const chainTip = await db.getChainTip(db.sql);
66+
expect(chainTip.block_height).toBe(28);
67+
expect(chainTip.index_block_hash).toBe(
68+
'0x76cd67a65c0dfd5ea450bb9efe30da89fa125bfc077c953802f718353283a533'
69+
);
70+
expect(chainTip.block_hash).toBe(
71+
'0x7682af212d3c1ef62613412f9b5a727269b4548f14eca2e3f941f7ad8b3c11b2'
72+
);
73+
74+
// Export into temp TSV
75+
const tmpDir = 'src/tests-event-replay/.tmp/local';
76+
fs.mkdirSync(tmpDir, { recursive: true });
77+
await exportEventsAsTsv('local:/root/export.tsv');
4978

5079
// Re-import with exported TSV and check that chain tip matches.
5180
try {
@@ -59,7 +88,7 @@ describe('import/export tests', () => {
5988
'0x7682af212d3c1ef62613412f9b5a727269b4548f14eca2e3f941f7ad8b3c11b2'
6089
);
6190
} finally {
62-
fs.rmSync(tmpDir, { force: true, recursive: true });
91+
fs.rmSync(`${tmpDir}/export.tsv`);
6392
}
6493
});
6594

0 commit comments

Comments
 (0)