Skip to content

Commit 50ec4db

Browse files
committed
Implement resnapshot to handle consistency edge cases with initial
replication.
1 parent a749549 commit 50ec4db

File tree

10 files changed

+387
-195
lines changed

10 files changed

+387
-195
lines changed

modules/module-postgres/src/replication/SnapshotQuery.ts

Lines changed: 73 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,27 @@
11
import { ColumnDescriptor, SourceTable } from '@powersync/service-core';
2-
import { PgChunk, PgConnection, PgTypeOid, StatementParam } from '@powersync/service-jpgwire';
2+
import { PgChunk, PgConnection, PgType, PgTypeOid, StatementParam } from '@powersync/service-jpgwire';
33
import { escapeIdentifier } from '../utils/pgwire_utils.js';
4-
import { logger } from '@powersync/lib-services-framework';
54
import { SqliteValue } from '@powersync/service-sync-rules';
65

76
export interface SnapshotQuery {
87
initialize(): Promise<void>;
98
nextChunk(): AsyncIterableIterator<PgChunk>;
109
}
1110

11+
export type PrimaryKeyValue = Record<string, SqliteValue>;
12+
13+
export interface MissingRow {
14+
table: SourceTable;
15+
key: PrimaryKeyValue;
16+
}
17+
18+
/**
19+
* Snapshot query using a plain SELECT * FROM table; chunked using
20+
* DELCLARE CURSOR / FETCH.
21+
*
22+
* This supports all tables, but does not efficiently resume the snapshot
23+
* if the process is restarted.
24+
*/
1225
export class SimpleSnapshotQuery {
1326
public constructor(
1427
private readonly connection: PgConnection,
@@ -25,6 +38,16 @@ export class SimpleSnapshotQuery {
2538
}
2639
}
2740

41+
/**
42+
* Performs a table snapshot query, chunking by ranges of primary key data.
43+
*
44+
* This may miss some rows if they are modified during the snapshot query.
45+
* In that case, logical replication will pick up those rows afterwards,
46+
* possibly resulting in an IdSnapshotQuery.
47+
*
48+
* Currently, this only supports a table with a single primary key column,
49+
* of a select few types.
50+
*/
2851
export class ChunkedSnapshotQuery {
2952
/**
3053
* Primary key types that we support for chunked snapshots.
@@ -104,3 +127,51 @@ export class ChunkedSnapshotQuery {
104127
}
105128
}
106129
}
130+
131+
/**
132+
* This performs a snapshot query using a list of primary keys.
133+
*/
134+
export class IdSnapshotQuery {
135+
private didChunk = false;
136+
137+
static supports(table: SourceTable) {
138+
// We have the same requirements as ChunkedSnapshotQuery.
139+
// This is typically only used as a fallback when ChunkedSnapshotQuery
140+
// skipped some rows.
141+
return ChunkedSnapshotQuery.supports(table);
142+
}
143+
144+
public constructor(
145+
private readonly connection: PgConnection,
146+
private readonly table: SourceTable,
147+
private readonly keys: PrimaryKeyValue[]
148+
) {}
149+
150+
public async initialize(): Promise<void> {
151+
// No-op
152+
}
153+
154+
public async *nextChunk(): AsyncIterableIterator<PgChunk> {
155+
// Only produce one chunk
156+
if (this.didChunk) {
157+
return;
158+
}
159+
this.didChunk = true;
160+
161+
const keyDefinition = this.table.replicaIdColumns[0];
162+
const ids = this.keys.map((record) => record[keyDefinition.name]);
163+
const type = PgType.getArrayType(keyDefinition.typeId!);
164+
if (type == null) {
165+
throw new Error(`Cannot determine primary key array type for ${JSON.stringify(keyDefinition)}`);
166+
}
167+
yield* this.connection.stream({
168+
statement: `SELECT * FROM ${this.table.escapedIdentifier} WHERE ${escapeIdentifier(keyDefinition.name)} = ANY($1)`,
169+
params: [
170+
{
171+
type: type,
172+
value: ids
173+
}
174+
]
175+
});
176+
}
177+
}

modules/module-postgres/src/replication/WalStream.ts

Lines changed: 129 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,26 @@
11
import { container, errors, logger } from '@powersync/lib-services-framework';
2-
import { getUuidReplicaIdentityBson, Metrics, SourceEntityDescriptor, storage } from '@powersync/service-core';
2+
import {
3+
BucketStorageBatch,
4+
getUuidReplicaIdentityBson,
5+
Metrics,
6+
SaveUpdate,
7+
SourceEntityDescriptor,
8+
storage
9+
} from '@powersync/service-core';
310
import * as pgwire from '@powersync/service-jpgwire';
411
import { DatabaseInputRow, SqliteRow, SqlSyncRules, TablePattern, toSyncRulesRow } from '@powersync/service-sync-rules';
512
import * as pg_utils from '../utils/pgwire_utils.js';
613
import { PgManager } from './PgManager.js';
714
import { getPgOutputRelation, getRelId } from './PgRelation.js';
815
import { checkSourceConfiguration, getReplicationIdentityColumns } from './replication-utils.js';
9-
import { ChunkedSnapshotQuery, SimpleSnapshotQuery, SnapshotQuery } from './SnapshotQuery.js';
16+
import {
17+
ChunkedSnapshotQuery,
18+
IdSnapshotQuery,
19+
MissingRow,
20+
PrimaryKeyValue,
21+
SimpleSnapshotQuery,
22+
SnapshotQuery
23+
} from './SnapshotQuery.js';
1024

1125
export const ZERO_LSN = '00000000/00000000';
1226
export const PUBLICATION_NAME = 'powersync';
@@ -359,19 +373,8 @@ WHERE oid = $1::regclass`,
359373
logger.info(`${this.slot_name} Skipping ${table.qualifiedName} - snapshot already done`);
360374
continue;
361375
}
362-
let tableLsnNotBefore: string;
363-
await db.query('BEGIN');
364-
try {
365-
await this.snapshotTable(batch, db, table);
366-
367-
const rs = await db.query(`select pg_current_wal_lsn() as lsn`);
368-
tableLsnNotBefore = rs.rows[0][0];
369-
} finally {
370-
// Read-only transaction, commit does not actually do anything.
371-
await db.query('COMMIT');
372-
}
373376

374-
await batch.markSnapshotDone([table], tableLsnNotBefore);
377+
await this.snapshotTableInTx(batch, db, table);
375378
await touch();
376379
}
377380
}
@@ -391,7 +394,38 @@ WHERE oid = $1::regclass`,
391394
}
392395
}
393396

394-
private async snapshotTable(batch: storage.BucketStorageBatch, db: pgwire.PgConnection, table: storage.SourceTable) {
397+
private async snapshotTableInTx(
398+
batch: storage.BucketStorageBatch,
399+
db: pgwire.PgConnection,
400+
table: storage.SourceTable,
401+
limited?: PrimaryKeyValue[]
402+
): Promise<storage.SourceTable> {
403+
await db.query('BEGIN');
404+
try {
405+
let tableLsnNotBefore: string;
406+
await this.snapshotTable(batch, db, table, limited);
407+
408+
// Get the current LSN.
409+
// The data will only be consistent once incremental replication
410+
// has passed that point.
411+
// We have to get this LSN _after_ we have started the snapshot query.
412+
const rs = await db.query(`select pg_current_wal_lsn() as lsn`);
413+
tableLsnNotBefore = rs.rows[0][0];
414+
await db.query('COMMIT');
415+
const [resultTable] = await batch.markSnapshotDone([table], tableLsnNotBefore);
416+
return resultTable;
417+
} catch (e) {
418+
await db.query('ROLLBACK');
419+
throw e;
420+
}
421+
}
422+
423+
private async snapshotTable(
424+
batch: storage.BucketStorageBatch,
425+
db: pgwire.PgConnection,
426+
table: storage.SourceTable,
427+
limited?: PrimaryKeyValue[]
428+
) {
395429
logger.info(`${this.slot_name} Replicating ${table.qualifiedName}`);
396430
const estimatedCount = await this.estimatedCount(db, table);
397431
let at = 0;
@@ -401,13 +435,16 @@ WHERE oid = $1::regclass`,
401435
// We do streaming on two levels:
402436
// 1. Coarse level: DELCARE CURSOR, FETCH 10000 at a time.
403437
// 2. Fine level: Stream chunks from each fetch call.
404-
if (ChunkedSnapshotQuery.supports(table)) {
438+
if (limited) {
439+
q = new IdSnapshotQuery(db, table, limited);
440+
} else if (ChunkedSnapshotQuery.supports(table)) {
405441
// Single primary key - we can use the primary key for chunking
406442
const orderByKey = table.replicaIdColumns[0];
407443
logger.info(`Chunking ${table.qualifiedName} by ${orderByKey.name}`);
408-
q = new ChunkedSnapshotQuery(db, table, 10_000);
444+
q = new ChunkedSnapshotQuery(db, table, 1000);
409445
} else {
410446
// Fallback case - query the entire table
447+
logger.info(`Snapshot ${table.qualifiedName} without chunking`);
411448
q = new SimpleSnapshotQuery(db, table, 10_000);
412449
}
413450
await q.initialize();
@@ -501,37 +538,52 @@ WHERE oid = $1::regclass`,
501538
// Truncate this table, in case a previous snapshot was interrupted.
502539
await batch.truncate([result.table]);
503540

504-
let lsn: string = ZERO_LSN;
505541
// Start the snapshot inside a transaction.
506542
// We use a dedicated connection for this.
507543
const db = await this.connections.snapshotConnection();
508544
try {
509-
await db.query('BEGIN');
510-
try {
511-
await this.snapshotTable(batch, db, result.table);
512-
513-
// Get the current LSN.
514-
// The data will only be consistent once incremental replication
515-
// has passed that point.
516-
// We have to get this LSN _after_ we have started the snapshot query.
517-
const rs = await db.query(`select pg_current_wal_lsn() as lsn`);
518-
lsn = rs.rows[0][0];
519-
520-
await db.query('COMMIT');
521-
} catch (e) {
522-
await db.query('ROLLBACK');
523-
throw e;
524-
}
545+
const table = await this.snapshotTableInTx(batch, db, result.table);
546+
return table;
525547
} finally {
526548
await db.end();
527549
}
528-
const [table] = await batch.markSnapshotDone([result.table], lsn);
529-
return table;
530550
}
531551

532552
return result.table;
533553
}
534554

555+
/**
556+
* Process rows that have missing TOAST values.
557+
*
558+
* This can happen during edge cases in the chunked intial snapshot process.
559+
*
560+
* We handle this similar to an inline table snapshot, but limited to the specific
561+
* set of rows.
562+
*/
563+
private async resnapshot(batch: BucketStorageBatch, rows: MissingRow[]) {
564+
const byTable = new Map<string | number, MissingRow[]>();
565+
for (let row of rows) {
566+
if (!byTable.has(row.table.objectId)) {
567+
byTable.set(row.table.objectId, []);
568+
}
569+
byTable.get(row.table.objectId)!.push(row);
570+
}
571+
const db = await this.connections.snapshotConnection();
572+
try {
573+
for (let rows of byTable.values()) {
574+
const table = rows[0].table;
575+
await this.snapshotTableInTx(
576+
batch,
577+
db,
578+
table,
579+
rows.map((r) => r.key)
580+
);
581+
}
582+
} finally {
583+
await db.end();
584+
}
585+
}
586+
535587
private getTable(relationId: number): storage.SourceTable {
536588
const table = this.relation_cache.get(relationId);
537589
if (table == null) {
@@ -640,8 +692,38 @@ WHERE oid = $1::regclass`,
640692
// Auto-activate as soon as initial replication is done
641693
await this.storage.autoActivate();
642694

695+
let resnapshot: { table: storage.SourceTable; key: PrimaryKeyValue }[] = [];
696+
697+
const markRecordUnavailable = (record: SaveUpdate) => {
698+
if (!IdSnapshotQuery.supports(record.sourceTable)) {
699+
// If it's not supported, it's also safe to ignore
700+
return;
701+
}
702+
let key: PrimaryKeyValue = {};
703+
for (let column of record.sourceTable.replicaIdColumns) {
704+
const name = column.name;
705+
const value = record.after[name];
706+
if (value == null) {
707+
// We don't expect this to actually happen.
708+
// The key should always be present in the "after" record.
709+
return;
710+
}
711+
key[name] = value;
712+
}
713+
resnapshot.push({
714+
table: record.sourceTable,
715+
key: key
716+
});
717+
};
718+
643719
await this.storage.startBatch(
644-
{ zeroLSN: ZERO_LSN, defaultSchema: POSTGRES_DEFAULT_SCHEMA, storeCurrentData: true, skipExistingRows: false },
720+
{
721+
zeroLSN: ZERO_LSN,
722+
defaultSchema: POSTGRES_DEFAULT_SCHEMA,
723+
storeCurrentData: true,
724+
skipExistingRows: false,
725+
markRecordUnavailable
726+
},
645727
async (batch) => {
646728
// Replication never starts in the middle of a transaction
647729
let inTx = false;
@@ -665,6 +747,16 @@ WHERE oid = $1::regclass`,
665747
} else if (msg.tag == 'commit') {
666748
Metrics.getInstance().transactions_replicated_total.add(1);
667749
inTx = false;
750+
// flush() must be before the resnapshot check - that is
751+
// typically what reports the resnapshot records.
752+
await batch.flush();
753+
// This _must_ be checked after the flush(), and before
754+
// commit() or ack(). We never persist the resnapshot list,
755+
// so we have to process it before marking our progress.
756+
if (resnapshot.length > 0) {
757+
await this.resnapshot(batch, resnapshot);
758+
resnapshot = [];
759+
}
668760
await batch.commit(msg.lsn!);
669761
await this.ack(msg.lsn!, replicationStream);
670762
} else {

0 commit comments

Comments
 (0)