From a8afd4bcfb7443eb0c1d819c393f049c66c2bcb8 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 11 Dec 2024 17:52:01 +0200 Subject: [PATCH 1/7] Reapply "Use DECLARE CURSOR / FETCH for table snapshots." This reverts commit 6f8d03cdb09b36bf862a8d2265f44cc66a610cb7. --- .../src/replication/WalStream.ts | 111 +++++++++++------- 1 file changed, 69 insertions(+), 42 deletions(-) diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index 34c168f9..c4e030d0 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -358,10 +358,18 @@ WHERE oid = $1::regclass`, logger.info(`${this.slot_name} Skipping ${table.qualifiedName} - snapshot already done`); continue; } - await this.snapshotTable(batch, db, table); + let tableLsnNotBefore: string; + await db.query('BEGIN'); + try { + await this.snapshotTable(batch, db, table); + + const rs = await db.query(`select pg_current_wal_lsn() as lsn`); + tableLsnNotBefore = rs.rows[0][0]; + } finally { + // Read-only transaction, commit does not actually do anything. + await db.query('COMMIT'); + } - const rs = await db.query(`select pg_current_wal_lsn() as lsn`); - const tableLsnNotBefore = rs.rows[0][0]; await batch.markSnapshotDone([table], tableLsnNotBefore); await touch(); } @@ -387,51 +395,70 @@ WHERE oid = $1::regclass`, const estimatedCount = await this.estimatedCount(db, table); let at = 0; let lastLogIndex = 0; - const cursor = db.stream({ statement: `SELECT * FROM ${table.escapedIdentifier}` }); - let columns: { i: number; name: string }[] = []; - // pgwire streams rows in chunks. - // These chunks can be quite small (as little as 16KB), so we don't flush chunks automatically. - - for await (let chunk of cursor) { - if (chunk.tag == 'RowDescription') { - let i = 0; - columns = chunk.payload.map((c) => { - return { i: i++, name: c.name }; - }); - continue; - } - const rows = chunk.rows.map((row) => { - let q: DatabaseInputRow = {}; - for (let c of columns) { - q[c.name] = row[c.i]; - } - return q; + // We do streaming on two levels: + // 1. Coarse level: DELCARE CURSOR, FETCH 10000 at a time. + // 2. Fine level: Stream chunks from each fetch call. + await db.query(`DECLARE powersync_cursor CURSOR FOR SELECT * FROM ${table.escapedIdentifier}`); + + let columns: { i: number; name: string }[] = []; + let hasRemainingData = true; + while (hasRemainingData) { + // Fetch 10k at a time. + // The balance here is between latency overhead per FETCH call, + // and not spending too much time on each FETCH call. + // We aim for a couple of seconds on each FETCH call. + const cursor = db.stream({ + statement: `FETCH 10000 FROM powersync_cursor` }); - if (rows.length > 0 && at - lastLogIndex >= 5000) { - logger.info(`${this.slot_name} Replicating ${table.qualifiedName} ${at}/${estimatedCount}`); - lastLogIndex = at; - } - if (this.abort_signal.aborted) { - throw new Error(`Aborted initial replication of ${this.slot_name}`); - } + hasRemainingData = false; + // pgwire streams rows in chunks. + // These chunks can be quite small (as little as 16KB), so we don't flush chunks automatically. + // There are typically 100-200 rows per chunk. + for await (let chunk of cursor) { + if (chunk.tag == 'RowDescription') { + // We get a RowDescription for each FETCH call, but they should + // all be the same. + let i = 0; + columns = chunk.payload.map((c) => { + return { i: i++, name: c.name }; + }); + continue; + } - for (const record of WalStream.getQueryData(rows)) { - // This auto-flushes when the batch reaches its size limit - await batch.save({ - tag: storage.SaveOperationTag.INSERT, - sourceTable: table, - before: undefined, - beforeReplicaId: undefined, - after: record, - afterReplicaId: getUuidReplicaIdentityBson(record, table.replicaIdColumns) + const rows = chunk.rows.map((row) => { + let q: DatabaseInputRow = {}; + for (let c of columns) { + q[c.name] = row[c.i]; + } + return q; }); - } + if (rows.length > 0 && at - lastLogIndex >= 5000) { + logger.info(`${this.slot_name} Replicating ${table.qualifiedName} ${at}/${estimatedCount}`); + lastLogIndex = at; + hasRemainingData = true; + } + if (this.abort_signal.aborted) { + throw new Error(`Aborted initial replication of ${this.slot_name}`); + } + + for (const record of WalStream.getQueryData(rows)) { + // This auto-flushes when the batch reaches its size limit + await batch.save({ + tag: storage.SaveOperationTag.INSERT, + sourceTable: table, + before: undefined, + beforeReplicaId: undefined, + after: record, + afterReplicaId: getUuidReplicaIdentityBson(record, table.replicaIdColumns) + }); + } - at += rows.length; - Metrics.getInstance().rows_replicated_total.add(rows.length); + at += rows.length; + Metrics.getInstance().rows_replicated_total.add(rows.length); - await touch(); + await touch(); + } } await batch.flush(); From df8c8e04c8768d46d59b6053da074969c16877d8 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 11 Dec 2024 18:36:02 +0200 Subject: [PATCH 2/7] Implement chunking by primary key. --- .../src/replication/SnapshotQuery.ts | 80 +++++++++++++++++++ .../src/replication/WalStream.ts | 32 ++++++-- .../src/storage/mongo/MongoBucketBatch.ts | 2 +- 3 files changed, 107 insertions(+), 7 deletions(-) create mode 100644 modules/module-postgres/src/replication/SnapshotQuery.ts diff --git a/modules/module-postgres/src/replication/SnapshotQuery.ts b/modules/module-postgres/src/replication/SnapshotQuery.ts new file mode 100644 index 00000000..b625294c --- /dev/null +++ b/modules/module-postgres/src/replication/SnapshotQuery.ts @@ -0,0 +1,80 @@ +import { ColumnDescriptor, SourceTable } from '@powersync/service-core'; +import { PgChunk, PgConnection, StatementParam } from '@powersync/service-jpgwire'; +import { escapeIdentifier } from '../utils/pgwire_utils.js'; +import { logger } from '@powersync/lib-services-framework'; +import { SqliteValue } from '@powersync/service-sync-rules'; + +export interface SnapshotQuery { + initialize(): Promise; + nextChunk(): AsyncIterableIterator; +} + +export class SimpleSnapshotQuery { + public constructor( + private readonly connection: PgConnection, + private readonly table: SourceTable, + private readonly chunkSize: number = 10_000 + ) {} + + public async initialize(): Promise { + await this.connection.query(`DECLARE snapshot_cursor CURSOR FOR SELECT * FROM ${this.table.escapedIdentifier}`); + } + + public nextChunk(): AsyncIterableIterator { + return this.connection.stream(`FETCH ${this.chunkSize} FROM snapshot_cursor`); + } +} + +export class ChunkedSnapshotQuery { + private lastKey: SqliteValue | null = null; + + public constructor( + private readonly connection: PgConnection, + private readonly table: SourceTable, + private readonly key: ColumnDescriptor, + private readonly chunkSize: number = 10_000 + ) {} + + public async initialize(): Promise { + // No-op + } + + public async *nextChunk(): AsyncIterableIterator { + let stream: AsyncIterableIterator; + if (this.lastKey == null) { + stream = this.connection.stream( + `SELECT * FROM ${this.table.escapedIdentifier} ORDER BY ${escapeIdentifier(this.key.name)} LIMIT ${this.chunkSize}` + ); + } else { + if (this.key.typeId == null) { + throw new Error(`typeId required for primary key ${this.key.name}`); + } + let type: StatementParam['type'] = Number(this.key.typeId); + stream = this.connection.stream({ + statement: `SELECT * FROM ${this.table.escapedIdentifier} WHERE ${escapeIdentifier(this.key.name)} > $1 ORDER BY ${escapeIdentifier(this.key.name)} LIMIT ${this.chunkSize}`, + params: [{ value: this.lastKey, type }] + }); + } + let primaryKeyIndex: number = -1; + + for await (let chunk of stream) { + if (chunk.tag == 'RowDescription') { + // We get a RowDescription for each FETCH call, but they should + // all be the same. + let i = 0; + const pk = chunk.payload.findIndex((c) => c.name == this.key.name); + if (pk < 0) { + throw new Error( + `Cannot find primary key column ${this.key} in results. Keys: ${chunk.payload.map((c) => c.name).join(', ')}` + ); + } + primaryKeyIndex = pk; + } + + if (chunk.rows.length > 0) { + this.lastKey = chunk.rows[chunk.rows.length - 1][primaryKeyIndex]; + yield chunk; + } + } + } +} diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index c4e030d0..c369ab4c 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -1,11 +1,19 @@ import { container, errors, logger } from '@powersync/lib-services-framework'; -import { getUuidReplicaIdentityBson, Metrics, SourceEntityDescriptor, storage } from '@powersync/service-core'; +import { + ColumnDescriptor, + escapeIdentifier, + getUuidReplicaIdentityBson, + Metrics, + SourceEntityDescriptor, + storage +} from '@powersync/service-core'; import * as pgwire from '@powersync/service-jpgwire'; import { DatabaseInputRow, SqliteRow, SqlSyncRules, TablePattern, toSyncRulesRow } from '@powersync/service-sync-rules'; import * as pg_utils from '../utils/pgwire_utils.js'; import { PgManager } from './PgManager.js'; import { getPgOutputRelation, getRelId } from './PgRelation.js'; import { checkSourceConfiguration, getReplicationIdentityColumns } from './replication-utils.js'; +import { ChunkedSnapshotQuery, SimpleSnapshotQuery, SnapshotQuery } from './SnapshotQuery.js'; export const ZERO_LSN = '00000000/00000000'; export const PUBLICATION_NAME = 'powersync'; @@ -396,10 +404,22 @@ WHERE oid = $1::regclass`, let at = 0; let lastLogIndex = 0; + let orderByKey: ColumnDescriptor | null = null; + + let q: SnapshotQuery; // We do streaming on two levels: // 1. Coarse level: DELCARE CURSOR, FETCH 10000 at a time. // 2. Fine level: Stream chunks from each fetch call. - await db.query(`DECLARE powersync_cursor CURSOR FOR SELECT * FROM ${table.escapedIdentifier}`); + if (table.replicaIdColumns.length == 1) { + // Single primary key - we can use the primary key for chunking + orderByKey = table.replicaIdColumns[0]; + logger.info(`Chunking ${table.qualifiedName} by ${orderByKey.name}`); + q = new ChunkedSnapshotQuery(db, table, orderByKey, 10_000); + } else { + // Fallback case - query the entire table + q = new SimpleSnapshotQuery(db, table, 10_000); + } + await q.initialize(); let columns: { i: number; name: string }[] = []; let hasRemainingData = true; @@ -408,9 +428,7 @@ WHERE oid = $1::regclass`, // The balance here is between latency overhead per FETCH call, // and not spending too much time on each FETCH call. // We aim for a couple of seconds on each FETCH call. - const cursor = db.stream({ - statement: `FETCH 10000 FROM powersync_cursor` - }); + const cursor = q.nextChunk(); hasRemainingData = false; // pgwire streams rows in chunks. // These chunks can be quite small (as little as 16KB), so we don't flush chunks automatically. @@ -433,10 +451,12 @@ WHERE oid = $1::regclass`, } return q; }); + if (rows.length > 0) { + hasRemainingData = true; + } if (rows.length > 0 && at - lastLogIndex >= 5000) { logger.info(`${this.slot_name} Replicating ${table.qualifiedName} ${at}/${estimatedCount}`); lastLogIndex = at; - hasRemainingData = true; } if (this.abort_signal.aborted) { throw new Error(`Aborted initial replication of ${this.slot_name}`); diff --git a/packages/service-core/src/storage/mongo/MongoBucketBatch.ts b/packages/service-core/src/storage/mongo/MongoBucketBatch.ts index c882b510..6e84d9ca 100644 --- a/packages/service-core/src/storage/mongo/MongoBucketBatch.ts +++ b/packages/service-core/src/storage/mongo/MongoBucketBatch.ts @@ -744,7 +744,7 @@ export class MongoBucketBatch extends DisposableObserver Date: Wed, 11 Dec 2024 18:52:05 +0200 Subject: [PATCH 3/7] yield all chunks. --- modules/module-postgres/src/replication/SnapshotQuery.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/module-postgres/src/replication/SnapshotQuery.ts b/modules/module-postgres/src/replication/SnapshotQuery.ts index b625294c..e2763ec2 100644 --- a/modules/module-postgres/src/replication/SnapshotQuery.ts +++ b/modules/module-postgres/src/replication/SnapshotQuery.ts @@ -73,8 +73,8 @@ export class ChunkedSnapshotQuery { if (chunk.rows.length > 0) { this.lastKey = chunk.rows[chunk.rows.length - 1][primaryKeyIndex]; - yield chunk; } + yield chunk; } } } From a749549856981871e83a492a91ecd570d144829d Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 12 Dec 2024 10:17:30 +0200 Subject: [PATCH 4/7] Refactor; limit supported primary key chunking types. --- .../src/replication/SnapshotQuery.ts | 34 +++++++++-- .../src/replication/WalStream.ts | 17 ++---- packages/jpgwire/src/pgwire_types.ts | 58 ++++++++++++------- 3 files changed, 72 insertions(+), 37 deletions(-) diff --git a/modules/module-postgres/src/replication/SnapshotQuery.ts b/modules/module-postgres/src/replication/SnapshotQuery.ts index e2763ec2..2473625e 100644 --- a/modules/module-postgres/src/replication/SnapshotQuery.ts +++ b/modules/module-postgres/src/replication/SnapshotQuery.ts @@ -1,5 +1,5 @@ import { ColumnDescriptor, SourceTable } from '@powersync/service-core'; -import { PgChunk, PgConnection, StatementParam } from '@powersync/service-jpgwire'; +import { PgChunk, PgConnection, PgTypeOid, StatementParam } from '@powersync/service-jpgwire'; import { escapeIdentifier } from '../utils/pgwire_utils.js'; import { logger } from '@powersync/lib-services-framework'; import { SqliteValue } from '@powersync/service-sync-rules'; @@ -26,14 +26,40 @@ export class SimpleSnapshotQuery { } export class ChunkedSnapshotQuery { - private lastKey: SqliteValue | null = null; + /** + * Primary key types that we support for chunked snapshots. + * + * Can expand this over time as we add more tests, + * and ensure there are no issues with type conversion. + */ + static SUPPORTED_TYPES = [ + PgTypeOid.TEXT, + PgTypeOid.VARCHAR, + PgTypeOid.UUID, + PgTypeOid.INT2, + PgTypeOid.INT4, + PgTypeOid.INT8 + ]; + + static supports(table: SourceTable) { + if (table.replicaIdColumns.length != 1) { + return false; + } + const primaryKey = table.replicaIdColumns[0]; + + return primaryKey.typeId != null && ChunkedSnapshotQuery.SUPPORTED_TYPES.includes(Number(primaryKey.typeId)); + } + + private readonly key: ColumnDescriptor; + private lastKey: string | bigint | null = null; public constructor( private readonly connection: PgConnection, private readonly table: SourceTable, - private readonly key: ColumnDescriptor, private readonly chunkSize: number = 10_000 - ) {} + ) { + this.key = table.replicaIdColumns[0]; + } public async initialize(): Promise { // No-op diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index c369ab4c..0332fc89 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -1,12 +1,5 @@ import { container, errors, logger } from '@powersync/lib-services-framework'; -import { - ColumnDescriptor, - escapeIdentifier, - getUuidReplicaIdentityBson, - Metrics, - SourceEntityDescriptor, - storage -} from '@powersync/service-core'; +import { getUuidReplicaIdentityBson, Metrics, SourceEntityDescriptor, storage } from '@powersync/service-core'; import * as pgwire from '@powersync/service-jpgwire'; import { DatabaseInputRow, SqliteRow, SqlSyncRules, TablePattern, toSyncRulesRow } from '@powersync/service-sync-rules'; import * as pg_utils from '../utils/pgwire_utils.js'; @@ -404,17 +397,15 @@ WHERE oid = $1::regclass`, let at = 0; let lastLogIndex = 0; - let orderByKey: ColumnDescriptor | null = null; - let q: SnapshotQuery; // We do streaming on two levels: // 1. Coarse level: DELCARE CURSOR, FETCH 10000 at a time. // 2. Fine level: Stream chunks from each fetch call. - if (table.replicaIdColumns.length == 1) { + if (ChunkedSnapshotQuery.supports(table)) { // Single primary key - we can use the primary key for chunking - orderByKey = table.replicaIdColumns[0]; + const orderByKey = table.replicaIdColumns[0]; logger.info(`Chunking ${table.qualifiedName} by ${orderByKey.name}`); - q = new ChunkedSnapshotQuery(db, table, orderByKey, 10_000); + q = new ChunkedSnapshotQuery(db, table, 10_000); } else { // Fallback case - query the entire table q = new SimpleSnapshotQuery(db, table, 10_000); diff --git a/packages/jpgwire/src/pgwire_types.ts b/packages/jpgwire/src/pgwire_types.ts index a93aeba6..a887b5d9 100644 --- a/packages/jpgwire/src/pgwire_types.ts +++ b/packages/jpgwire/src/pgwire_types.ts @@ -5,38 +5,58 @@ import { dateToSqlite, lsnMakeComparable, timestampToSqlite, timestamptzToSqlite import { JsonContainer } from '@powersync/service-jsonbig'; import { DatabaseInputRow } from '@powersync/service-sync-rules'; +export enum PgTypeOid { + TEXT = 25, + UUID = 2950, + VARCHAR = 1043, + BOOL = 16, + BYTEA = 17, + INT2 = 21, + INT4 = 23, + OID = 26, + INT8 = 20, + FLOAT4 = 700, + FLOAT8 = 701, + DATE = 1082, + TIMESTAMP = 1114, + TIMESTAMPTZ = 1184, + JSON = 114, + JSONB = 3802, + PG_LSN = 3220 +} + export class PgType { static decode(text: string, typeOid: number) { switch ( typeOid // add line here when register new type ) { - case 25 /* text */: - case 2950 /* uuid */: - case 1043 /* varchar */: + case PgTypeOid.TEXT: + case PgTypeOid.UUID: + case PgTypeOid.VARCHAR: return text; - case 16 /* bool */: + case PgTypeOid.BOOL: return text == 't'; - case 17 /* bytea */: + case PgTypeOid.BYTEA: return this._decodeBytea(text); - case 21 /* int2 */: - case 23 /* int4 */: - case 26 /* oid */: - case 20 /* int8 */: + case PgTypeOid.INT2: + case PgTypeOid.INT4: + case PgTypeOid.OID: + case PgTypeOid.INT8: return BigInt(text); - case 700 /* float4 */: - case 701 /* float8 */: + case PgTypeOid.FLOAT4: + case PgTypeOid.FLOAT8: return Number(text); - case 1082 /* date */: + case PgTypeOid.DATE: return dateToSqlite(text); - case 1114 /* timestamp */: + case PgTypeOid.TIMESTAMP: return timestampToSqlite(text); - case 1184 /* timestamptz */: + case PgTypeOid.TIMESTAMPTZ: return timestamptzToSqlite(text); - case 114 /* json */: - case 3802 /* jsonb */: + case PgTypeOid.JSON: + case PgTypeOid.JSONB: // Don't parse the contents return new JsonContainer(text); - case 3220 /* pg_lsn */: + case PgTypeOid.PG_LSN: return lsnMakeComparable(text); } const elemTypeid = this._elemTypeOid(typeOid); @@ -47,9 +67,7 @@ export class PgType { } static _elemTypeOid(arrayTypeOid: number) { // select 'case ' || typarray || ': return ' || oid || '; // ' || typname from pg_catalog.pg_type WHERE typearray != 0; - switch ( - arrayTypeOid // add line here when register new type - ) { + switch (arrayTypeOid) { case 1000: return 16; // bool case 1001: From 50ec4db5c2fd6525f4cda9749cbe055e5205e16b Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 12 Dec 2024 13:49:10 +0200 Subject: [PATCH 5/7] Implement resnapshot to handle consistency edge cases with initial replication. --- .../src/replication/SnapshotQuery.ts | 75 +++++- .../src/replication/WalStream.ts | 166 ++++++++++--- .../test/src/large_batch.test.ts | 65 ++++- packages/jpgwire/src/pgwire_types.ts | 230 +++++++----------- packages/service-core/src/auth/KeySpec.ts | 2 +- .../src/routes/endpoints/probes.ts | 12 +- .../service-core/src/storage/BucketStorage.ts | 4 + .../src/storage/mongo/MongoBucketBatch.ts | 18 +- .../storage/mongo/MongoSyncBucketStorage.ts | 3 +- .../test/src/routes/probes.test.ts | 7 +- 10 files changed, 387 insertions(+), 195 deletions(-) diff --git a/modules/module-postgres/src/replication/SnapshotQuery.ts b/modules/module-postgres/src/replication/SnapshotQuery.ts index 2473625e..51b9737f 100644 --- a/modules/module-postgres/src/replication/SnapshotQuery.ts +++ b/modules/module-postgres/src/replication/SnapshotQuery.ts @@ -1,7 +1,6 @@ import { ColumnDescriptor, SourceTable } from '@powersync/service-core'; -import { PgChunk, PgConnection, PgTypeOid, StatementParam } from '@powersync/service-jpgwire'; +import { PgChunk, PgConnection, PgType, PgTypeOid, StatementParam } from '@powersync/service-jpgwire'; import { escapeIdentifier } from '../utils/pgwire_utils.js'; -import { logger } from '@powersync/lib-services-framework'; import { SqliteValue } from '@powersync/service-sync-rules'; export interface SnapshotQuery { @@ -9,6 +8,20 @@ export interface SnapshotQuery { nextChunk(): AsyncIterableIterator; } +export type PrimaryKeyValue = Record; + +export interface MissingRow { + table: SourceTable; + key: PrimaryKeyValue; +} + +/** + * Snapshot query using a plain SELECT * FROM table; chunked using + * DELCLARE CURSOR / FETCH. + * + * This supports all tables, but does not efficiently resume the snapshot + * if the process is restarted. + */ export class SimpleSnapshotQuery { public constructor( private readonly connection: PgConnection, @@ -25,6 +38,16 @@ export class SimpleSnapshotQuery { } } +/** + * Performs a table snapshot query, chunking by ranges of primary key data. + * + * This may miss some rows if they are modified during the snapshot query. + * In that case, logical replication will pick up those rows afterwards, + * possibly resulting in an IdSnapshotQuery. + * + * Currently, this only supports a table with a single primary key column, + * of a select few types. + */ export class ChunkedSnapshotQuery { /** * Primary key types that we support for chunked snapshots. @@ -104,3 +127,51 @@ export class ChunkedSnapshotQuery { } } } + +/** + * This performs a snapshot query using a list of primary keys. + */ +export class IdSnapshotQuery { + private didChunk = false; + + static supports(table: SourceTable) { + // We have the same requirements as ChunkedSnapshotQuery. + // This is typically only used as a fallback when ChunkedSnapshotQuery + // skipped some rows. + return ChunkedSnapshotQuery.supports(table); + } + + public constructor( + private readonly connection: PgConnection, + private readonly table: SourceTable, + private readonly keys: PrimaryKeyValue[] + ) {} + + public async initialize(): Promise { + // No-op + } + + public async *nextChunk(): AsyncIterableIterator { + // Only produce one chunk + if (this.didChunk) { + return; + } + this.didChunk = true; + + const keyDefinition = this.table.replicaIdColumns[0]; + const ids = this.keys.map((record) => record[keyDefinition.name]); + const type = PgType.getArrayType(keyDefinition.typeId!); + if (type == null) { + throw new Error(`Cannot determine primary key array type for ${JSON.stringify(keyDefinition)}`); + } + yield* this.connection.stream({ + statement: `SELECT * FROM ${this.table.escapedIdentifier} WHERE ${escapeIdentifier(keyDefinition.name)} = ANY($1)`, + params: [ + { + type: type, + value: ids + } + ] + }); + } +} diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index 0332fc89..d549451e 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -1,12 +1,26 @@ import { container, errors, logger } from '@powersync/lib-services-framework'; -import { getUuidReplicaIdentityBson, Metrics, SourceEntityDescriptor, storage } from '@powersync/service-core'; +import { + BucketStorageBatch, + getUuidReplicaIdentityBson, + Metrics, + SaveUpdate, + SourceEntityDescriptor, + storage +} from '@powersync/service-core'; import * as pgwire from '@powersync/service-jpgwire'; import { DatabaseInputRow, SqliteRow, SqlSyncRules, TablePattern, toSyncRulesRow } from '@powersync/service-sync-rules'; import * as pg_utils from '../utils/pgwire_utils.js'; import { PgManager } from './PgManager.js'; import { getPgOutputRelation, getRelId } from './PgRelation.js'; import { checkSourceConfiguration, getReplicationIdentityColumns } from './replication-utils.js'; -import { ChunkedSnapshotQuery, SimpleSnapshotQuery, SnapshotQuery } from './SnapshotQuery.js'; +import { + ChunkedSnapshotQuery, + IdSnapshotQuery, + MissingRow, + PrimaryKeyValue, + SimpleSnapshotQuery, + SnapshotQuery +} from './SnapshotQuery.js'; export const ZERO_LSN = '00000000/00000000'; export const PUBLICATION_NAME = 'powersync'; @@ -359,19 +373,8 @@ WHERE oid = $1::regclass`, logger.info(`${this.slot_name} Skipping ${table.qualifiedName} - snapshot already done`); continue; } - let tableLsnNotBefore: string; - await db.query('BEGIN'); - try { - await this.snapshotTable(batch, db, table); - - const rs = await db.query(`select pg_current_wal_lsn() as lsn`); - tableLsnNotBefore = rs.rows[0][0]; - } finally { - // Read-only transaction, commit does not actually do anything. - await db.query('COMMIT'); - } - await batch.markSnapshotDone([table], tableLsnNotBefore); + await this.snapshotTableInTx(batch, db, table); await touch(); } } @@ -391,7 +394,38 @@ WHERE oid = $1::regclass`, } } - private async snapshotTable(batch: storage.BucketStorageBatch, db: pgwire.PgConnection, table: storage.SourceTable) { + private async snapshotTableInTx( + batch: storage.BucketStorageBatch, + db: pgwire.PgConnection, + table: storage.SourceTable, + limited?: PrimaryKeyValue[] + ): Promise { + await db.query('BEGIN'); + try { + let tableLsnNotBefore: string; + await this.snapshotTable(batch, db, table, limited); + + // Get the current LSN. + // The data will only be consistent once incremental replication + // has passed that point. + // We have to get this LSN _after_ we have started the snapshot query. + const rs = await db.query(`select pg_current_wal_lsn() as lsn`); + tableLsnNotBefore = rs.rows[0][0]; + await db.query('COMMIT'); + const [resultTable] = await batch.markSnapshotDone([table], tableLsnNotBefore); + return resultTable; + } catch (e) { + await db.query('ROLLBACK'); + throw e; + } + } + + private async snapshotTable( + batch: storage.BucketStorageBatch, + db: pgwire.PgConnection, + table: storage.SourceTable, + limited?: PrimaryKeyValue[] + ) { logger.info(`${this.slot_name} Replicating ${table.qualifiedName}`); const estimatedCount = await this.estimatedCount(db, table); let at = 0; @@ -401,13 +435,16 @@ WHERE oid = $1::regclass`, // We do streaming on two levels: // 1. Coarse level: DELCARE CURSOR, FETCH 10000 at a time. // 2. Fine level: Stream chunks from each fetch call. - if (ChunkedSnapshotQuery.supports(table)) { + if (limited) { + q = new IdSnapshotQuery(db, table, limited); + } else if (ChunkedSnapshotQuery.supports(table)) { // Single primary key - we can use the primary key for chunking const orderByKey = table.replicaIdColumns[0]; logger.info(`Chunking ${table.qualifiedName} by ${orderByKey.name}`); - q = new ChunkedSnapshotQuery(db, table, 10_000); + q = new ChunkedSnapshotQuery(db, table, 1000); } else { // Fallback case - query the entire table + logger.info(`Snapshot ${table.qualifiedName} without chunking`); q = new SimpleSnapshotQuery(db, table, 10_000); } await q.initialize(); @@ -501,37 +538,52 @@ WHERE oid = $1::regclass`, // Truncate this table, in case a previous snapshot was interrupted. await batch.truncate([result.table]); - let lsn: string = ZERO_LSN; // Start the snapshot inside a transaction. // We use a dedicated connection for this. const db = await this.connections.snapshotConnection(); try { - await db.query('BEGIN'); - try { - await this.snapshotTable(batch, db, result.table); - - // Get the current LSN. - // The data will only be consistent once incremental replication - // has passed that point. - // We have to get this LSN _after_ we have started the snapshot query. - const rs = await db.query(`select pg_current_wal_lsn() as lsn`); - lsn = rs.rows[0][0]; - - await db.query('COMMIT'); - } catch (e) { - await db.query('ROLLBACK'); - throw e; - } + const table = await this.snapshotTableInTx(batch, db, result.table); + return table; } finally { await db.end(); } - const [table] = await batch.markSnapshotDone([result.table], lsn); - return table; } return result.table; } + /** + * Process rows that have missing TOAST values. + * + * This can happen during edge cases in the chunked intial snapshot process. + * + * We handle this similar to an inline table snapshot, but limited to the specific + * set of rows. + */ + private async resnapshot(batch: BucketStorageBatch, rows: MissingRow[]) { + const byTable = new Map(); + for (let row of rows) { + if (!byTable.has(row.table.objectId)) { + byTable.set(row.table.objectId, []); + } + byTable.get(row.table.objectId)!.push(row); + } + const db = await this.connections.snapshotConnection(); + try { + for (let rows of byTable.values()) { + const table = rows[0].table; + await this.snapshotTableInTx( + batch, + db, + table, + rows.map((r) => r.key) + ); + } + } finally { + await db.end(); + } + } + private getTable(relationId: number): storage.SourceTable { const table = this.relation_cache.get(relationId); if (table == null) { @@ -640,8 +692,38 @@ WHERE oid = $1::regclass`, // Auto-activate as soon as initial replication is done await this.storage.autoActivate(); + let resnapshot: { table: storage.SourceTable; key: PrimaryKeyValue }[] = []; + + const markRecordUnavailable = (record: SaveUpdate) => { + if (!IdSnapshotQuery.supports(record.sourceTable)) { + // If it's not supported, it's also safe to ignore + return; + } + let key: PrimaryKeyValue = {}; + for (let column of record.sourceTable.replicaIdColumns) { + const name = column.name; + const value = record.after[name]; + if (value == null) { + // We don't expect this to actually happen. + // The key should always be present in the "after" record. + return; + } + key[name] = value; + } + resnapshot.push({ + table: record.sourceTable, + key: key + }); + }; + await this.storage.startBatch( - { zeroLSN: ZERO_LSN, defaultSchema: POSTGRES_DEFAULT_SCHEMA, storeCurrentData: true, skipExistingRows: false }, + { + zeroLSN: ZERO_LSN, + defaultSchema: POSTGRES_DEFAULT_SCHEMA, + storeCurrentData: true, + skipExistingRows: false, + markRecordUnavailable + }, async (batch) => { // Replication never starts in the middle of a transaction let inTx = false; @@ -665,6 +747,16 @@ WHERE oid = $1::regclass`, } else if (msg.tag == 'commit') { Metrics.getInstance().transactions_replicated_total.add(1); inTx = false; + // flush() must be before the resnapshot check - that is + // typically what reports the resnapshot records. + await batch.flush(); + // This _must_ be checked after the flush(), and before + // commit() or ack(). We never persist the resnapshot list, + // so we have to process it before marking our progress. + if (resnapshot.length > 0) { + await this.resnapshot(batch, resnapshot); + resnapshot = []; + } await batch.commit(msg.lsn!); await this.ack(msg.lsn!, replicationStream); } else { diff --git a/modules/module-postgres/test/src/large_batch.test.ts b/modules/module-postgres/test/src/large_batch.test.ts index bc08083a..576327a3 100644 --- a/modules/module-postgres/test/src/large_batch.test.ts +++ b/modules/module-postgres/test/src/large_batch.test.ts @@ -5,7 +5,8 @@ import { env } from './env.js'; import { TEST_CONNECTION_OPTIONS } from './util.js'; import { WalStreamTestContext } from './wal_stream_utils.js'; import * as timers from 'timers/promises'; -import { Metrics } from '@powersync/service-core'; +import { Metrics, reduceBucket } from '@powersync/service-core'; +import * as crypto from 'node:crypto'; describe('batch replication tests - mongodb', { timeout: 120_000 }, function () { // These are slow but consistent tests. @@ -368,6 +369,68 @@ function defineBatchTests(factory: StorageFactory) { expect(data.length).toEqual(11002 + deletedRowOps.length); } + test.only('chunked snapshot edge case', async () => { + // 1. Start with 10k rows, one row with id = 10000, and a large TOAST value in another column. + // 2. Replicate one batch of rows (id < 10000). + // 3. `UPDATE table SET id = 0 WHERE id = 10000` + // 4. Replicate the rest of the table. + // 5. Logical replication picks up the UPDATE above, but it is missing the TOAST column. + // 6. We end up with a row that has a missing TOAST column. + + try { + await using context = await WalStreamTestContext.open(factory); + + await context.updateSyncRules(`bucket_definitions: + global: + data: + - SELECT * FROM test_data`); + const { pool } = context; + + await pool.query(`CREATE TABLE test_data(id integer primary key, description text)`); + + await pool.query({ + statement: `INSERT INTO test_data(id, description) SELECT i, 'foo' FROM generate_series(1, 10000) i` + }); + + // Toast value, must be > 8kb after compression + const largeDescription = crypto.randomBytes(20_000).toString('hex'); + await pool.query({ + statement: 'UPDATE test_data SET description = $1 WHERE id = 10000', + params: [{ type: 'varchar', value: largeDescription }] + }); + + const p = context.replicateSnapshot(); + + const stopAfter = 1_000; + const startRowCount = + (await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0; + + while (true) { + const count = + ((await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0) - + startRowCount; + + if (count >= stopAfter) { + break; + } + await timers.setTimeout(1); + } + await pool.query('UPDATE test_data SET id = 0 WHERE id = 10000'); + await p; + + context.startStreaming(); + const data = await context.getBucketData('global[]', undefined, {}); + const reduced = reduceBucket(data); + expect(reduced.length).toEqual(10_001); + + const movedRow = reduced.find((row) => row.object_id === '0'); + expect(movedRow?.data).toEqual(`{"id":0,"description":"${largeDescription}"}`); + } catch (e) { + console.error(e); + throw e; + } + }); + function printMemoryUsage() { const memoryUsage = process.memoryUsage(); diff --git a/packages/jpgwire/src/pgwire_types.ts b/packages/jpgwire/src/pgwire_types.ts index a887b5d9..ddcfc5f7 100644 --- a/packages/jpgwire/src/pgwire_types.ts +++ b/packages/jpgwire/src/pgwire_types.ts @@ -25,11 +25,91 @@ export enum PgTypeOid { PG_LSN = 3220 } +// Generate using: +// select '[' || typarray || ', ' || oid || '], // ' || typname from pg_catalog.pg_type WHERE typarray != 0; +const ARRAY_TO_ELEM_OID = new Map([ + [1000, 16], // bool + [1001, 17], // bytea + [1002, 18], // char + [1003, 19], // name + [1016, 20], // int8 + [1005, 21], // int2 + [1006, 22], // int2vector + [1007, 23], // int4 + [1008, 24], // regproc + [1009, 25], // text + [1028, 26], // oid + [1010, 27], // tid + [1011, 28], // xid + [1012, 29], // cid + [1013, 30], // oidvector + [210, 71], // pg_type + [270, 75], // pg_attribute + [272, 81], // pg_proc + [273, 83], // pg_class + [199, 114], // json + [143, 142], // xml + [271, 5069], // xid8 + [1017, 600], // point + [1018, 601], // lseg + [1019, 602], // path + [1020, 603], // box + [1027, 604], // polygon + [629, 628], // line + [1021, 700], // float4 + [1022, 701], // float8 + [0, 705], // unknown + [719, 718], // circle + [791, 790], // money + [1040, 829], // macaddr + [1041, 869], // inet + [651, 650], // cidr + [775, 774], // macaddr8 + [1034, 1033], // aclitem + [1014, 1042], // bpchar + [1015, 1043], // varchar + [1182, 1082], // date + [1183, 1083], // time + [1115, 1114], // timestamp + [1185, 1184], // timestamptz + [1187, 1186], // interval + [1270, 1266], // timetz + [1561, 1560], // bit + [1563, 1562], // varbit + [1231, 1700], // numeric + [2201, 1790], // refcursor + [2207, 2202], // regprocedure + [2208, 2203], // regoper + [2209, 2204], // regoperator + [2210, 2205], // regclass + [4192, 4191], // regcollation + [2211, 2206], // regtype + [4097, 4096], // regrole + [4090, 4089], // regnamespace + [2951, 2950], // uuid + [3221, 3220], // pg_lsn + [3643, 3614], // tsvector + [3644, 3642], // gtsvector + [3645, 3615], // tsquery + [3735, 3734], // regconfig + [3770, 3769], // regdictionary + [3807, 3802], // jsonb + [4073, 4072] // jsonpath +]); + +const ELEM_OID_TO_ARRAY = new Map(); +ARRAY_TO_ELEM_OID.forEach((value, key) => { + ELEM_OID_TO_ARRAY.set(value, key); +}); + export class PgType { + static getArrayType(typeOid: number): number | undefined { + return ELEM_OID_TO_ARRAY.get(typeOid); + } + static decode(text: string, typeOid: number) { - switch ( - typeOid // add line here when register new type - ) { + switch (typeOid) { + // add line here when register new type case PgTypeOid.TEXT: case PgTypeOid.UUID: case PgTypeOid.VARCHAR: @@ -60,150 +140,16 @@ export class PgType { return lsnMakeComparable(text); } const elemTypeid = this._elemTypeOid(typeOid); - if (elemTypeid) { + if (elemTypeid != null) { return this._decodeArray(text, elemTypeid); } return text; // unknown type } - static _elemTypeOid(arrayTypeOid: number) { - // select 'case ' || typarray || ': return ' || oid || '; // ' || typname from pg_catalog.pg_type WHERE typearray != 0; - switch (arrayTypeOid) { - case 1000: - return 16; // bool - case 1001: - return 17; // bytea - case 1002: - return 18; // char - case 1003: - return 19; // name - case 1016: - return 20; // int8 - case 1005: - return 21; // int2 - case 1006: - return 22; // int2vector - case 1007: - return 23; // int4 - case 1008: - return 24; // regproc - case 1009: - return 25; // text - case 1028: - return 26; // oid - case 1010: - return 27; // tid - case 1011: - return 28; // xid - case 1012: - return 29; // cid - case 1013: - return 30; // oidvector - case 210: - return 71; // pg_type - case 270: - return 75; // pg_attribute - case 272: - return 81; // pg_proc - case 273: - return 83; // pg_class - case 199: - return 114; // json - case 143: - return 142; // xml - case 271: - return 5069; // xid8 - case 1017: - return 600; // point - case 1018: - return 601; // lseg - case 1019: - return 602; // path - case 1020: - return 603; // box - case 1027: - return 604; // polygon - case 629: - return 628; // line - case 1021: - return 700; // float4 - case 1022: - return 701; // float8 - case 0: - return 705; // unknown - case 719: - return 718; // circle - case 791: - return 790; // money - case 1040: - return 829; // macaddr - case 1041: - return 869; // inet - case 651: - return 650; // cidr - case 775: - return 774; // macaddr8 - case 1034: - return 1033; // aclitem - case 1014: - return 1042; // bpchar - case 1015: - return 1043; // varchar - case 1182: - return 1082; // date - case 1183: - return 1083; // time - case 1115: - return 1114; // timestamp - case 1185: - return 1184; // timestamptz - case 1187: - return 1186; // interval - case 1270: - return 1266; // timetz - case 1561: - return 1560; // bit - case 1563: - return 1562; // varbit - case 1231: - return 1700; // numeric - case 2201: - return 1790; // refcursor - case 2207: - return 2202; // regprocedure - case 2208: - return 2203; // regoper - case 2209: - return 2204; // regoperator - case 2210: - return 2205; // regclass - case 4192: - return 4191; // regcollation - case 2211: - return 2206; // regtype - case 4097: - return 4096; // regrole - case 4090: - return 4089; // regnamespace - case 2951: - return 2950; // uuid - case 3221: - return 3220; // pg_lsn - case 3643: - return 3614; // tsvector - case 3644: - return 3642; // gtsvector - case 3645: - return 3615; // tsquery - case 3735: - return 3734; // regconfig - case 3770: - return 3769; // regdictionary - case 3807: - return 3802; // jsonb - case 4073: - return 4072; // jsonpath - } + static _elemTypeOid(arrayTypeOid: number): number | undefined { + // select 'case ' || typarray || ': return ' || oid || '; // ' || typname from pg_catalog.pg_type WHERE typarray != 0; + return ARRAY_TO_ELEM_OID.get(arrayTypeOid); } + static _decodeArray(text: string, elemTypeOid: number): any { text = text.replace(/^\[.+=/, ''); // skip dimensions let result: any; diff --git a/packages/service-core/src/auth/KeySpec.ts b/packages/service-core/src/auth/KeySpec.ts index 5a82619c..6d917c20 100644 --- a/packages/service-core/src/auth/KeySpec.ts +++ b/packages/service-core/src/auth/KeySpec.ts @@ -49,7 +49,7 @@ export class KeySpec { } else if (this.source.kty === 'OKP') { return OKP_ALGORITHMS.includes(jwtAlg); } - + // 'EC' is unsupported return false; } diff --git a/packages/service-core/src/routes/endpoints/probes.ts b/packages/service-core/src/routes/endpoints/probes.ts index f38bbdab..1d409532 100644 --- a/packages/service-core/src/routes/endpoints/probes.ts +++ b/packages/service-core/src/routes/endpoints/probes.ts @@ -1,5 +1,5 @@ -import { container, router } from "@powersync/lib-services-framework"; -import { routeDefinition } from "../router.js"; +import { container, router } from '@powersync/lib-services-framework'; +import { routeDefinition } from '../router.js'; export enum ProbeRoutes { STARTUP = '/probes/startup', @@ -16,7 +16,7 @@ export const startupCheck = routeDefinition({ return new router.RouterResponse({ status: state.started ? 200 : 400, data: { - ...state, + ...state } }); } @@ -28,13 +28,13 @@ export const livenessCheck = routeDefinition({ handler: async () => { const state = container.probes.state(); - const timeDifference = Date.now() - state.touched_at.getTime() + const timeDifference = Date.now() - state.touched_at.getTime(); const status = timeDifference < 10000 ? 200 : 400; return new router.RouterResponse({ status, data: { - ...state, + ...state } }); } @@ -49,7 +49,7 @@ export const readinessCheck = routeDefinition({ return new router.RouterResponse({ status: state.ready ? 200 : 400, data: { - ...state, + ...state } }); } diff --git a/packages/service-core/src/storage/BucketStorage.ts b/packages/service-core/src/storage/BucketStorage.ts index f9664126..c3339874 100644 --- a/packages/service-core/src/storage/BucketStorage.ts +++ b/packages/service-core/src/storage/BucketStorage.ts @@ -218,6 +218,8 @@ export interface StartBatchOptions extends ParseSyncRulesOptions { * This will avoid creating new operations for rows previously replicated. */ skipExistingRows?: boolean; + + markRecordUnavailable?: BucketStorageMarkRecordUnavailable; } export interface SyncRulesBucketStorageListener extends DisposableListener { @@ -327,6 +329,8 @@ export interface BucketBatchStorageListener extends DisposableListener { replicationEvent: (payload: ReplicationEventPayload) => void; } +export type BucketStorageMarkRecordUnavailable = (record: SaveUpdate) => void; + export interface BucketStorageBatch extends DisposableObserverClient { /** * Save an op, and potentially flush. diff --git a/packages/service-core/src/storage/mongo/MongoBucketBatch.ts b/packages/service-core/src/storage/mongo/MongoBucketBatch.ts index 6e84d9ca..5424d4ab 100644 --- a/packages/service-core/src/storage/mongo/MongoBucketBatch.ts +++ b/packages/service-core/src/storage/mongo/MongoBucketBatch.ts @@ -1,4 +1,4 @@ -import { SqlEventDescriptor, SqliteRow, SqlSyncRules } from '@powersync/service-sync-rules'; +import { SqlEventDescriptor, SqliteRow, SqlSyncRules, ToastableSqliteRow } from '@powersync/service-sync-rules'; import * as bson from 'bson'; import * as mongo from 'mongodb'; @@ -7,6 +7,7 @@ import * as util from '../../util/util-index.js'; import { BucketBatchStorageListener, BucketStorageBatch, + BucketStorageMarkRecordUnavailable, FlushedResult, mergeToast, SaveOperationTag, @@ -48,6 +49,8 @@ export interface MongoBucketBatchOptions { * Set to true for initial replication. */ skipExistingRows: boolean; + + markRecordUnavailable: BucketStorageMarkRecordUnavailable | undefined; } export class MongoBucketBatch extends DisposableObserver implements BucketStorageBatch { @@ -64,6 +67,7 @@ export class MongoBucketBatch extends DisposableObserver cb.batchStarted?.(batch)); diff --git a/packages/service-core/test/src/routes/probes.test.ts b/packages/service-core/test/src/routes/probes.test.ts index 102a6063..3433d950 100644 --- a/packages/service-core/test/src/routes/probes.test.ts +++ b/packages/service-core/test/src/routes/probes.test.ts @@ -20,7 +20,12 @@ vi.mock('@powersync/lib-services-framework', () => ({ afterSend: () => Promise; __micro_router_response = true; - constructor({ status, data, headers, afterSend }: { + constructor({ + status, + data, + headers, + afterSend + }: { status?: number; data: any; headers?: Record; From 3cb32488873ef3dbc04208bfa22e96a22da1f336 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 12 Dec 2024 13:57:34 +0200 Subject: [PATCH 6/7] Override chunk size for testing. --- .../src/replication/WalStream.ts | 16 +++- .../test/src/large_batch.test.ts | 88 ++++++++++--------- .../test/src/wal_stream_utils.ts | 10 ++- 3 files changed, 68 insertions(+), 46 deletions(-) diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index d549451e..8b2c6793 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -30,6 +30,15 @@ export interface WalStreamOptions { connections: PgManager; storage: storage.SyncRulesBucketStorage; abort_signal: AbortSignal; + + /** + * Override snapshot chunk size, for testing. + * + * Defaults to 10_000. + * + * Note that queries are streamed, so we don't actually keep that much data in memory. + */ + snapshotChunkSize?: number; } interface InitResult { @@ -63,12 +72,15 @@ export class WalStream { private startedStreaming = false; + private snapshotChunkSize: number; + constructor(options: WalStreamOptions) { this.storage = options.storage; this.sync_rules = options.storage.getParsedSyncRules({ defaultSchema: POSTGRES_DEFAULT_SCHEMA }); this.group_id = options.storage.group_id; this.slot_name = options.storage.slot_name; this.connections = options.connections; + this.snapshotChunkSize = options.snapshotChunkSize ?? 10_000; this.abort_signal = options.abort_signal; this.abort_signal.addEventListener( @@ -441,11 +453,11 @@ WHERE oid = $1::regclass`, // Single primary key - we can use the primary key for chunking const orderByKey = table.replicaIdColumns[0]; logger.info(`Chunking ${table.qualifiedName} by ${orderByKey.name}`); - q = new ChunkedSnapshotQuery(db, table, 1000); + q = new ChunkedSnapshotQuery(db, table, this.snapshotChunkSize); } else { // Fallback case - query the entire table logger.info(`Snapshot ${table.qualifiedName} without chunking`); - q = new SimpleSnapshotQuery(db, table, 10_000); + q = new SimpleSnapshotQuery(db, table, this.snapshotChunkSize); } await q.initialize(); diff --git a/modules/module-postgres/test/src/large_batch.test.ts b/modules/module-postgres/test/src/large_batch.test.ts index 576327a3..98f11d4f 100644 --- a/modules/module-postgres/test/src/large_batch.test.ts +++ b/modules/module-postgres/test/src/large_batch.test.ts @@ -369,7 +369,7 @@ function defineBatchTests(factory: StorageFactory) { expect(data.length).toEqual(11002 + deletedRowOps.length); } - test.only('chunked snapshot edge case', async () => { + test('chunked snapshot edge case', async () => { // 1. Start with 10k rows, one row with id = 10000, and a large TOAST value in another column. // 2. Replicate one batch of rows (id < 10000). // 3. `UPDATE table SET id = 0 WHERE id = 10000` @@ -377,58 +377,66 @@ function defineBatchTests(factory: StorageFactory) { // 5. Logical replication picks up the UPDATE above, but it is missing the TOAST column. // 6. We end up with a row that has a missing TOAST column. - try { - await using context = await WalStreamTestContext.open(factory); + await using context = await WalStreamTestContext.open(factory, { + // We need to use a smaller chunk size here, so that we can run a query in between chunks + walStreamOptions: { snapshotChunkSize: 100 } + }); - await context.updateSyncRules(`bucket_definitions: + await context.updateSyncRules(`bucket_definitions: global: data: - SELECT * FROM test_data`); - const { pool } = context; + const { pool } = context; - await pool.query(`CREATE TABLE test_data(id integer primary key, description text)`); + await pool.query(`CREATE TABLE test_data(id integer primary key, description text)`); - await pool.query({ - statement: `INSERT INTO test_data(id, description) SELECT i, 'foo' FROM generate_series(1, 10000) i` - }); + // 1. Start with 10k rows, one row with id = 10000... + await pool.query({ + statement: `INSERT INTO test_data(id, description) SELECT i, 'foo' FROM generate_series(1, 10000) i` + }); - // Toast value, must be > 8kb after compression - const largeDescription = crypto.randomBytes(20_000).toString('hex'); - await pool.query({ - statement: 'UPDATE test_data SET description = $1 WHERE id = 10000', - params: [{ type: 'varchar', value: largeDescription }] - }); + // ...and a large TOAST value in another column. + // Toast value, must be > 8kb after compression + const largeDescription = crypto.randomBytes(20_000).toString('hex'); + await pool.query({ + statement: 'UPDATE test_data SET description = $1 WHERE id = 10000', + params: [{ type: 'varchar', value: largeDescription }] + }); - const p = context.replicateSnapshot(); + // 2. Replicate one batch of rows (id < 10000). + // Our "stopping point" here is not quite deterministic. + const p = context.replicateSnapshot(); - const stopAfter = 1_000; - const startRowCount = - (await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0; + const stopAfter = 1_000; + const startRowCount = (await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0; - while (true) { - const count = - ((await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0) - - startRowCount; + while (true) { + const count = + ((await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0) - startRowCount; - if (count >= stopAfter) { - break; - } - await timers.setTimeout(1); + if (count >= stopAfter) { + break; } - await pool.query('UPDATE test_data SET id = 0 WHERE id = 10000'); - await p; - - context.startStreaming(); - const data = await context.getBucketData('global[]', undefined, {}); - const reduced = reduceBucket(data); - expect(reduced.length).toEqual(10_001); - - const movedRow = reduced.find((row) => row.object_id === '0'); - expect(movedRow?.data).toEqual(`{"id":0,"description":"${largeDescription}"}`); - } catch (e) { - console.error(e); - throw e; + await timers.setTimeout(1); } + + // 3. `UPDATE table SET id = 0 WHERE id = 10000` + await pool.query('UPDATE test_data SET id = 0 WHERE id = 10000'); + + // 4. Replicate the rest of the table. + await p; + + // 5. Logical replication picks up the UPDATE above, but it is missing the TOAST column. + context.startStreaming(); + + // 6. If all went well, the "resnapshot" process would take care of this. + const data = await context.getBucketData('global[]', undefined, {}); + const reduced = reduceBucket(data); + + const movedRow = reduced.find((row) => row.object_id === '0'); + expect(movedRow?.data).toEqual(`{"id":0,"description":"${largeDescription}"}`); + + expect(reduced.length).toEqual(10_001); }); function printMemoryUsage() { diff --git a/modules/module-postgres/test/src/wal_stream_utils.ts b/modules/module-postgres/test/src/wal_stream_utils.ts index 3cde14cf..53d5c6b4 100644 --- a/modules/module-postgres/test/src/wal_stream_utils.ts +++ b/modules/module-postgres/test/src/wal_stream_utils.ts @@ -21,7 +21,7 @@ export class WalStreamTestContext implements AsyncDisposable { */ static async open( factory: (options: StorageOptions) => Promise, - options?: { doNotClear?: boolean } + options?: { doNotClear?: boolean; walStreamOptions?: Partial } ) { const f = await factory({ doNotClear: options?.doNotClear }); const connectionManager = new PgManager(TEST_CONNECTION_OPTIONS, {}); @@ -30,12 +30,13 @@ export class WalStreamTestContext implements AsyncDisposable { await clearTestDb(connectionManager.pool); } - return new WalStreamTestContext(f, connectionManager); + return new WalStreamTestContext(f, connectionManager, options?.walStreamOptions); } constructor( public factory: BucketStorageFactory, - public connectionManager: PgManager + public connectionManager: PgManager, + private walStreamOptions?: Partial ) {} async [Symbol.asyncDispose]() { @@ -97,7 +98,8 @@ export class WalStreamTestContext implements AsyncDisposable { const options: WalStreamOptions = { storage: this.storage, connections: this.connectionManager, - abort_signal: this.abortController.signal + abort_signal: this.abortController.signal, + ...this.walStreamOptions }; this._walStream = new WalStream(options); return this._walStream!; From f1ff6db59efeacf1c61d389b274b7df8d7b39a40 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 12 Dec 2024 14:23:49 +0200 Subject: [PATCH 7/7] Test all supported types for chunking. --- .../test/src/chunked_snapshots.test.ts | 157 ++++++++++++++++++ .../test/src/large_batch.test.ts | 77 +-------- 2 files changed, 160 insertions(+), 74 deletions(-) create mode 100644 modules/module-postgres/test/src/chunked_snapshots.test.ts diff --git a/modules/module-postgres/test/src/chunked_snapshots.test.ts b/modules/module-postgres/test/src/chunked_snapshots.test.ts new file mode 100644 index 00000000..fa0cfcd8 --- /dev/null +++ b/modules/module-postgres/test/src/chunked_snapshots.test.ts @@ -0,0 +1,157 @@ +import { MONGO_STORAGE_FACTORY, StorageFactory } from '@core-tests/util.js'; +import { Metrics, reduceBucket } from '@powersync/service-core'; +import { SqliteJsonValue } from '@powersync/service-sync-rules'; +import * as crypto from 'node:crypto'; +import * as timers from 'timers/promises'; +import { describe, expect, test } from 'vitest'; +import { WalStreamTestContext } from './wal_stream_utils.js'; + +describe('batch replication tests - mongodb', { timeout: 30_000 }, function () { + // These are slow but consistent tests. + // Not run on every test run, but we do run on CI, or when manually debugging issues. + defineBatchTests(MONGO_STORAGE_FACTORY); +}); + +function defineBatchTests(factory: StorageFactory) { + // We need to test every supported type, since chunking could be quite sensitive to + // how each specific type is handled. + test('chunked snapshot edge case (int2)', async () => { + await testChunkedSnapshot({ + idType: 'int2', + genId: 'i', + lastId: '2000', + moveTo: '0', + moveToJs: 0 + }); + }); + + test('chunked snapshot edge case (int4)', async () => { + await testChunkedSnapshot({ + idType: 'int4', + genId: 'i', + lastId: '2000', + moveTo: '0', + moveToJs: 0 + }); + }); + + test('chunked snapshot edge case (int8)', async () => { + await testChunkedSnapshot({ + idType: 'int8', + genId: 'i', + lastId: '2000', + moveTo: '0', + moveToJs: 0 + }); + }); + + test('chunked snapshot edge case (text)', async () => { + await testChunkedSnapshot({ + idType: 'text', + genId: `to_char(i, 'fm0000')`, + lastId: `'2000'`, + moveTo: `'0000'`, + moveToJs: '0000' + }); + }); + + test('chunked snapshot edge case (varchar)', async () => { + await testChunkedSnapshot({ + idType: 'varchar', + genId: `to_char(i, 'fm0000')`, + lastId: `'2000'`, + moveTo: `'0000'`, + moveToJs: '0000' + }); + }); + + test('chunked snapshot edge case (uuid)', async () => { + await testChunkedSnapshot({ + idType: 'uuid', + // Generate a uuid by using the first part of a uuid and appending a 4-digit number. + genId: `('00000000-0000-4000-8000-00000000' || to_char(i, 'fm0000')) :: uuid`, + lastId: `'00000000-0000-4000-8000-000000002000'`, + moveTo: `'00000000-0000-4000-8000-000000000000'`, + moveToJs: '00000000-0000-4000-8000-000000000000' + }); + }); + + async function testChunkedSnapshot(options: { + idType: string; + genId: string; + lastId: string; + moveTo: string; + moveToJs: SqliteJsonValue; + }) { + // 1. Start with 2k rows, one row with id = 2000, and a large TOAST value in another column. + // 2. Replicate one batch of rows (id < 2000). + // 3. `UPDATE table SET id = 0 WHERE id = 2000` + // 4. Replicate the rest of the table. + // 5. Logical replication picks up the UPDATE above, but it is missing the TOAST column. + // 6. We end up with a row that has a missing TOAST column. + + await using context = await WalStreamTestContext.open(factory, { + // We need to use a smaller chunk size here, so that we can run a query in between chunks + walStreamOptions: { snapshotChunkSize: 100 } + }); + + await context.updateSyncRules(`bucket_definitions: + global: + data: + - SELECT * FROM test_data`); + const { pool } = context; + + await pool.query(`CREATE TABLE test_data(id ${options.idType} primary key, description text)`); + + // 1. Start with 10k rows, one row with id = 10000... + await pool.query({ + statement: `INSERT INTO test_data(id, description) SELECT ${options.genId}, 'foo' FROM generate_series(1, 2000) i` + }); + + // ...and a large TOAST value in another column. + // Toast value, must be > 8kb after compression + const largeDescription = crypto.randomBytes(20_000).toString('hex'); + await pool.query({ + statement: `UPDATE test_data SET description = $1 WHERE id = ${options.lastId} :: ${options.idType}`, + params: [{ type: 'varchar', value: largeDescription }] + }); + + // 2. Replicate one batch of rows (id < 2000). + // Our "stopping point" here is not quite deterministic. + const p = context.replicateSnapshot(); + + const stopAfter = 100; + const startRowCount = (await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0; + + while (true) { + const count = + ((await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0) - startRowCount; + + if (count >= stopAfter) { + break; + } + await timers.setTimeout(1); + } + + // 3. `UPDATE table SET id = 0 WHERE id = 2000` + const rs = await pool.query( + `UPDATE test_data SET id = ${options.moveTo} WHERE id = ${options.lastId} RETURNING id` + ); + expect(rs.rows.length).toEqual(1); + + // 4. Replicate the rest of the table. + await p; + + // 5. Logical replication picks up the UPDATE above, but it is missing the TOAST column. + context.startStreaming(); + + // 6. If all went well, the "resnapshot" process would take care of this. + const data = await context.getBucketData('global[]', undefined, {}); + const reduced = reduceBucket(data); + + const movedRow = reduced.find((row) => row.object_id == String(options.moveToJs)); + expect(movedRow?.data).toEqual(JSON.stringify({ id: options.moveToJs, description: largeDescription })); + + expect(reduced.length).toEqual(2001); + } +} diff --git a/modules/module-postgres/test/src/large_batch.test.ts b/modules/module-postgres/test/src/large_batch.test.ts index 98f11d4f..544819fa 100644 --- a/modules/module-postgres/test/src/large_batch.test.ts +++ b/modules/module-postgres/test/src/large_batch.test.ts @@ -1,12 +1,11 @@ -import { MONGO_STORAGE_FACTORY, StorageFactory, StorageOptions } from '@core-tests/util.js'; +import { MONGO_STORAGE_FACTORY, StorageFactory } from '@core-tests/util.js'; +import { Metrics } from '@powersync/service-core'; +import * as timers from 'timers/promises'; import { describe, expect, test } from 'vitest'; import { populateData } from '../../dist/utils/populate_test_data.js'; import { env } from './env.js'; import { TEST_CONNECTION_OPTIONS } from './util.js'; import { WalStreamTestContext } from './wal_stream_utils.js'; -import * as timers from 'timers/promises'; -import { Metrics, reduceBucket } from '@powersync/service-core'; -import * as crypto from 'node:crypto'; describe('batch replication tests - mongodb', { timeout: 120_000 }, function () { // These are slow but consistent tests. @@ -369,76 +368,6 @@ function defineBatchTests(factory: StorageFactory) { expect(data.length).toEqual(11002 + deletedRowOps.length); } - test('chunked snapshot edge case', async () => { - // 1. Start with 10k rows, one row with id = 10000, and a large TOAST value in another column. - // 2. Replicate one batch of rows (id < 10000). - // 3. `UPDATE table SET id = 0 WHERE id = 10000` - // 4. Replicate the rest of the table. - // 5. Logical replication picks up the UPDATE above, but it is missing the TOAST column. - // 6. We end up with a row that has a missing TOAST column. - - await using context = await WalStreamTestContext.open(factory, { - // We need to use a smaller chunk size here, so that we can run a query in between chunks - walStreamOptions: { snapshotChunkSize: 100 } - }); - - await context.updateSyncRules(`bucket_definitions: - global: - data: - - SELECT * FROM test_data`); - const { pool } = context; - - await pool.query(`CREATE TABLE test_data(id integer primary key, description text)`); - - // 1. Start with 10k rows, one row with id = 10000... - await pool.query({ - statement: `INSERT INTO test_data(id, description) SELECT i, 'foo' FROM generate_series(1, 10000) i` - }); - - // ...and a large TOAST value in another column. - // Toast value, must be > 8kb after compression - const largeDescription = crypto.randomBytes(20_000).toString('hex'); - await pool.query({ - statement: 'UPDATE test_data SET description = $1 WHERE id = 10000', - params: [{ type: 'varchar', value: largeDescription }] - }); - - // 2. Replicate one batch of rows (id < 10000). - // Our "stopping point" here is not quite deterministic. - const p = context.replicateSnapshot(); - - const stopAfter = 1_000; - const startRowCount = (await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0; - - while (true) { - const count = - ((await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0) - startRowCount; - - if (count >= stopAfter) { - break; - } - await timers.setTimeout(1); - } - - // 3. `UPDATE table SET id = 0 WHERE id = 10000` - await pool.query('UPDATE test_data SET id = 0 WHERE id = 10000'); - - // 4. Replicate the rest of the table. - await p; - - // 5. Logical replication picks up the UPDATE above, but it is missing the TOAST column. - context.startStreaming(); - - // 6. If all went well, the "resnapshot" process would take care of this. - const data = await context.getBucketData('global[]', undefined, {}); - const reduced = reduceBucket(data); - - const movedRow = reduced.find((row) => row.object_id === '0'); - expect(movedRow?.data).toEqual(`{"id":0,"description":"${largeDescription}"}`); - - expect(reduced.length).toEqual(10_001); - }); - function printMemoryUsage() { const memoryUsage = process.memoryUsage();