Skip to content

Commit

Permalink
Override chunk size for testing.
Browse files Browse the repository at this point in the history
  • Loading branch information
rkistner committed Dec 12, 2024
1 parent 50ec4db commit 3cb3248
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 46 deletions.
16 changes: 14 additions & 2 deletions modules/module-postgres/src/replication/WalStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ export interface WalStreamOptions {
connections: PgManager;
storage: storage.SyncRulesBucketStorage;
abort_signal: AbortSignal;

/**
* Override snapshot chunk size, for testing.
*
* Defaults to 10_000.
*
* Note that queries are streamed, so we don't actually keep that much data in memory.
*/
snapshotChunkSize?: number;
}

interface InitResult {
Expand Down Expand Up @@ -63,12 +72,15 @@ export class WalStream {

private startedStreaming = false;

private snapshotChunkSize: number;

constructor(options: WalStreamOptions) {
this.storage = options.storage;
this.sync_rules = options.storage.getParsedSyncRules({ defaultSchema: POSTGRES_DEFAULT_SCHEMA });
this.group_id = options.storage.group_id;
this.slot_name = options.storage.slot_name;
this.connections = options.connections;
this.snapshotChunkSize = options.snapshotChunkSize ?? 10_000;

this.abort_signal = options.abort_signal;
this.abort_signal.addEventListener(
Expand Down Expand Up @@ -441,11 +453,11 @@ WHERE oid = $1::regclass`,
// Single primary key - we can use the primary key for chunking
const orderByKey = table.replicaIdColumns[0];
logger.info(`Chunking ${table.qualifiedName} by ${orderByKey.name}`);
q = new ChunkedSnapshotQuery(db, table, 1000);
q = new ChunkedSnapshotQuery(db, table, this.snapshotChunkSize);
} else {
// Fallback case - query the entire table
logger.info(`Snapshot ${table.qualifiedName} without chunking`);
q = new SimpleSnapshotQuery(db, table, 10_000);
q = new SimpleSnapshotQuery(db, table, this.snapshotChunkSize);
}
await q.initialize();

Expand Down
88 changes: 48 additions & 40 deletions modules/module-postgres/test/src/large_batch.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -369,66 +369,74 @@ function defineBatchTests(factory: StorageFactory) {
expect(data.length).toEqual(11002 + deletedRowOps.length);
}

test.only('chunked snapshot edge case', async () => {
test('chunked snapshot edge case', async () => {
// 1. Start with 10k rows, one row with id = 10000, and a large TOAST value in another column.
// 2. Replicate one batch of rows (id < 10000).
// 3. `UPDATE table SET id = 0 WHERE id = 10000`
// 4. Replicate the rest of the table.
// 5. Logical replication picks up the UPDATE above, but it is missing the TOAST column.
// 6. We end up with a row that has a missing TOAST column.

try {
await using context = await WalStreamTestContext.open(factory);
await using context = await WalStreamTestContext.open(factory, {
// We need to use a smaller chunk size here, so that we can run a query in between chunks
walStreamOptions: { snapshotChunkSize: 100 }
});

await context.updateSyncRules(`bucket_definitions:
await context.updateSyncRules(`bucket_definitions:
global:
data:
- SELECT * FROM test_data`);
const { pool } = context;
const { pool } = context;

await pool.query(`CREATE TABLE test_data(id integer primary key, description text)`);
await pool.query(`CREATE TABLE test_data(id integer primary key, description text)`);

await pool.query({
statement: `INSERT INTO test_data(id, description) SELECT i, 'foo' FROM generate_series(1, 10000) i`
});
// 1. Start with 10k rows, one row with id = 10000...
await pool.query({
statement: `INSERT INTO test_data(id, description) SELECT i, 'foo' FROM generate_series(1, 10000) i`
});

// Toast value, must be > 8kb after compression
const largeDescription = crypto.randomBytes(20_000).toString('hex');
await pool.query({
statement: 'UPDATE test_data SET description = $1 WHERE id = 10000',
params: [{ type: 'varchar', value: largeDescription }]
});
// ...and a large TOAST value in another column.
// Toast value, must be > 8kb after compression
const largeDescription = crypto.randomBytes(20_000).toString('hex');
await pool.query({
statement: 'UPDATE test_data SET description = $1 WHERE id = 10000',
params: [{ type: 'varchar', value: largeDescription }]
});

const p = context.replicateSnapshot();
// 2. Replicate one batch of rows (id < 10000).
// Our "stopping point" here is not quite deterministic.
const p = context.replicateSnapshot();

const stopAfter = 1_000;
const startRowCount =
(await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0;
const stopAfter = 1_000;
const startRowCount = (await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0;

while (true) {
const count =
((await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0) -
startRowCount;
while (true) {
const count =
((await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0) - startRowCount;

if (count >= stopAfter) {
break;
}
await timers.setTimeout(1);
if (count >= stopAfter) {
break;
}
await pool.query('UPDATE test_data SET id = 0 WHERE id = 10000');
await p;

context.startStreaming();
const data = await context.getBucketData('global[]', undefined, {});
const reduced = reduceBucket(data);
expect(reduced.length).toEqual(10_001);

const movedRow = reduced.find((row) => row.object_id === '0');
expect(movedRow?.data).toEqual(`{"id":0,"description":"${largeDescription}"}`);
} catch (e) {
console.error(e);
throw e;
await timers.setTimeout(1);
}

// 3. `UPDATE table SET id = 0 WHERE id = 10000`
await pool.query('UPDATE test_data SET id = 0 WHERE id = 10000');

// 4. Replicate the rest of the table.
await p;

// 5. Logical replication picks up the UPDATE above, but it is missing the TOAST column.
context.startStreaming();

// 6. If all went well, the "resnapshot" process would take care of this.
const data = await context.getBucketData('global[]', undefined, {});
const reduced = reduceBucket(data);

const movedRow = reduced.find((row) => row.object_id === '0');
expect(movedRow?.data).toEqual(`{"id":0,"description":"${largeDescription}"}`);

expect(reduced.length).toEqual(10_001);
});

function printMemoryUsage() {
Expand Down
10 changes: 6 additions & 4 deletions modules/module-postgres/test/src/wal_stream_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export class WalStreamTestContext implements AsyncDisposable {
*/
static async open(
factory: (options: StorageOptions) => Promise<BucketStorageFactory>,
options?: { doNotClear?: boolean }
options?: { doNotClear?: boolean; walStreamOptions?: Partial<WalStreamOptions> }
) {
const f = await factory({ doNotClear: options?.doNotClear });
const connectionManager = new PgManager(TEST_CONNECTION_OPTIONS, {});
Expand All @@ -30,12 +30,13 @@ export class WalStreamTestContext implements AsyncDisposable {
await clearTestDb(connectionManager.pool);
}

return new WalStreamTestContext(f, connectionManager);
return new WalStreamTestContext(f, connectionManager, options?.walStreamOptions);
}

constructor(
public factory: BucketStorageFactory,
public connectionManager: PgManager
public connectionManager: PgManager,
private walStreamOptions?: Partial<WalStreamOptions>
) {}

async [Symbol.asyncDispose]() {
Expand Down Expand Up @@ -97,7 +98,8 @@ export class WalStreamTestContext implements AsyncDisposable {
const options: WalStreamOptions = {
storage: this.storage,
connections: this.connectionManager,
abort_signal: this.abortController.signal
abort_signal: this.abortController.signal,
...this.walStreamOptions
};
this._walStream = new WalStream(options);
return this._walStream!;
Expand Down

0 comments on commit 3cb3248

Please sign in to comment.