diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index d549451e..8b2c6793 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -30,6 +30,15 @@ export interface WalStreamOptions { connections: PgManager; storage: storage.SyncRulesBucketStorage; abort_signal: AbortSignal; + + /** + * Override snapshot chunk size, for testing. + * + * Defaults to 10_000. + * + * Note that queries are streamed, so we don't actually keep that much data in memory. + */ + snapshotChunkSize?: number; } interface InitResult { @@ -63,12 +72,15 @@ export class WalStream { private startedStreaming = false; + private snapshotChunkSize: number; + constructor(options: WalStreamOptions) { this.storage = options.storage; this.sync_rules = options.storage.getParsedSyncRules({ defaultSchema: POSTGRES_DEFAULT_SCHEMA }); this.group_id = options.storage.group_id; this.slot_name = options.storage.slot_name; this.connections = options.connections; + this.snapshotChunkSize = options.snapshotChunkSize ?? 10_000; this.abort_signal = options.abort_signal; this.abort_signal.addEventListener( @@ -441,11 +453,11 @@ WHERE oid = $1::regclass`, // Single primary key - we can use the primary key for chunking const orderByKey = table.replicaIdColumns[0]; logger.info(`Chunking ${table.qualifiedName} by ${orderByKey.name}`); - q = new ChunkedSnapshotQuery(db, table, 1000); + q = new ChunkedSnapshotQuery(db, table, this.snapshotChunkSize); } else { // Fallback case - query the entire table logger.info(`Snapshot ${table.qualifiedName} without chunking`); - q = new SimpleSnapshotQuery(db, table, 10_000); + q = new SimpleSnapshotQuery(db, table, this.snapshotChunkSize); } await q.initialize(); diff --git a/modules/module-postgres/test/src/large_batch.test.ts b/modules/module-postgres/test/src/large_batch.test.ts index 576327a3..98f11d4f 100644 --- a/modules/module-postgres/test/src/large_batch.test.ts +++ b/modules/module-postgres/test/src/large_batch.test.ts @@ -369,7 +369,7 @@ function defineBatchTests(factory: StorageFactory) { expect(data.length).toEqual(11002 + deletedRowOps.length); } - test.only('chunked snapshot edge case', async () => { + test('chunked snapshot edge case', async () => { // 1. Start with 10k rows, one row with id = 10000, and a large TOAST value in another column. // 2. Replicate one batch of rows (id < 10000). // 3. `UPDATE table SET id = 0 WHERE id = 10000` @@ -377,58 +377,66 @@ function defineBatchTests(factory: StorageFactory) { // 5. Logical replication picks up the UPDATE above, but it is missing the TOAST column. // 6. We end up with a row that has a missing TOAST column. - try { - await using context = await WalStreamTestContext.open(factory); + await using context = await WalStreamTestContext.open(factory, { + // We need to use a smaller chunk size here, so that we can run a query in between chunks + walStreamOptions: { snapshotChunkSize: 100 } + }); - await context.updateSyncRules(`bucket_definitions: + await context.updateSyncRules(`bucket_definitions: global: data: - SELECT * FROM test_data`); - const { pool } = context; + const { pool } = context; - await pool.query(`CREATE TABLE test_data(id integer primary key, description text)`); + await pool.query(`CREATE TABLE test_data(id integer primary key, description text)`); - await pool.query({ - statement: `INSERT INTO test_data(id, description) SELECT i, 'foo' FROM generate_series(1, 10000) i` - }); + // 1. Start with 10k rows, one row with id = 10000... + await pool.query({ + statement: `INSERT INTO test_data(id, description) SELECT i, 'foo' FROM generate_series(1, 10000) i` + }); - // Toast value, must be > 8kb after compression - const largeDescription = crypto.randomBytes(20_000).toString('hex'); - await pool.query({ - statement: 'UPDATE test_data SET description = $1 WHERE id = 10000', - params: [{ type: 'varchar', value: largeDescription }] - }); + // ...and a large TOAST value in another column. + // Toast value, must be > 8kb after compression + const largeDescription = crypto.randomBytes(20_000).toString('hex'); + await pool.query({ + statement: 'UPDATE test_data SET description = $1 WHERE id = 10000', + params: [{ type: 'varchar', value: largeDescription }] + }); - const p = context.replicateSnapshot(); + // 2. Replicate one batch of rows (id < 10000). + // Our "stopping point" here is not quite deterministic. + const p = context.replicateSnapshot(); - const stopAfter = 1_000; - const startRowCount = - (await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0; + const stopAfter = 1_000; + const startRowCount = (await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0; - while (true) { - const count = - ((await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0) - - startRowCount; + while (true) { + const count = + ((await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0) - startRowCount; - if (count >= stopAfter) { - break; - } - await timers.setTimeout(1); + if (count >= stopAfter) { + break; } - await pool.query('UPDATE test_data SET id = 0 WHERE id = 10000'); - await p; - - context.startStreaming(); - const data = await context.getBucketData('global[]', undefined, {}); - const reduced = reduceBucket(data); - expect(reduced.length).toEqual(10_001); - - const movedRow = reduced.find((row) => row.object_id === '0'); - expect(movedRow?.data).toEqual(`{"id":0,"description":"${largeDescription}"}`); - } catch (e) { - console.error(e); - throw e; + await timers.setTimeout(1); } + + // 3. `UPDATE table SET id = 0 WHERE id = 10000` + await pool.query('UPDATE test_data SET id = 0 WHERE id = 10000'); + + // 4. Replicate the rest of the table. + await p; + + // 5. Logical replication picks up the UPDATE above, but it is missing the TOAST column. + context.startStreaming(); + + // 6. If all went well, the "resnapshot" process would take care of this. + const data = await context.getBucketData('global[]', undefined, {}); + const reduced = reduceBucket(data); + + const movedRow = reduced.find((row) => row.object_id === '0'); + expect(movedRow?.data).toEqual(`{"id":0,"description":"${largeDescription}"}`); + + expect(reduced.length).toEqual(10_001); }); function printMemoryUsage() { diff --git a/modules/module-postgres/test/src/wal_stream_utils.ts b/modules/module-postgres/test/src/wal_stream_utils.ts index 3cde14cf..53d5c6b4 100644 --- a/modules/module-postgres/test/src/wal_stream_utils.ts +++ b/modules/module-postgres/test/src/wal_stream_utils.ts @@ -21,7 +21,7 @@ export class WalStreamTestContext implements AsyncDisposable { */ static async open( factory: (options: StorageOptions) => Promise, - options?: { doNotClear?: boolean } + options?: { doNotClear?: boolean; walStreamOptions?: Partial } ) { const f = await factory({ doNotClear: options?.doNotClear }); const connectionManager = new PgManager(TEST_CONNECTION_OPTIONS, {}); @@ -30,12 +30,13 @@ export class WalStreamTestContext implements AsyncDisposable { await clearTestDb(connectionManager.pool); } - return new WalStreamTestContext(f, connectionManager); + return new WalStreamTestContext(f, connectionManager, options?.walStreamOptions); } constructor( public factory: BucketStorageFactory, - public connectionManager: PgManager + public connectionManager: PgManager, + private walStreamOptions?: Partial ) {} async [Symbol.asyncDispose]() { @@ -97,7 +98,8 @@ export class WalStreamTestContext implements AsyncDisposable { const options: WalStreamOptions = { storage: this.storage, connections: this.connectionManager, - abort_signal: this.abortController.signal + abort_signal: this.abortController.signal, + ...this.walStreamOptions }; this._walStream = new WalStream(options); return this._walStream!;