Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] [Postgres] Resumable replication per-table #163

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 177 additions & 0 deletions modules/module-postgres/src/replication/SnapshotQuery.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
import { ColumnDescriptor, SourceTable } from '@powersync/service-core';
import { PgChunk, PgConnection, PgType, PgTypeOid, StatementParam } from '@powersync/service-jpgwire';
import { escapeIdentifier } from '../utils/pgwire_utils.js';
import { SqliteValue } from '@powersync/service-sync-rules';

export interface SnapshotQuery {
initialize(): Promise<void>;
nextChunk(): AsyncIterableIterator<PgChunk>;
}

export type PrimaryKeyValue = Record<string, SqliteValue>;

export interface MissingRow {
table: SourceTable;
key: PrimaryKeyValue;
}

/**
* Snapshot query using a plain SELECT * FROM table; chunked using
* DELCLARE CURSOR / FETCH.
*
* This supports all tables, but does not efficiently resume the snapshot
* if the process is restarted.
*/
export class SimpleSnapshotQuery {
public constructor(
private readonly connection: PgConnection,
private readonly table: SourceTable,
private readonly chunkSize: number = 10_000
) {}

public async initialize(): Promise<void> {
await this.connection.query(`DECLARE snapshot_cursor CURSOR FOR SELECT * FROM ${this.table.escapedIdentifier}`);
}

public nextChunk(): AsyncIterableIterator<PgChunk> {
return this.connection.stream(`FETCH ${this.chunkSize} FROM snapshot_cursor`);
}
}

/**
* Performs a table snapshot query, chunking by ranges of primary key data.
*
* This may miss some rows if they are modified during the snapshot query.
* In that case, logical replication will pick up those rows afterwards,
* possibly resulting in an IdSnapshotQuery.
*
* Currently, this only supports a table with a single primary key column,
* of a select few types.
*/
export class ChunkedSnapshotQuery {
/**
* Primary key types that we support for chunked snapshots.
*
* Can expand this over time as we add more tests,
* and ensure there are no issues with type conversion.
*/
static SUPPORTED_TYPES = [
PgTypeOid.TEXT,
PgTypeOid.VARCHAR,
PgTypeOid.UUID,
PgTypeOid.INT2,
PgTypeOid.INT4,
PgTypeOid.INT8
];

static supports(table: SourceTable) {
if (table.replicaIdColumns.length != 1) {
return false;
}
const primaryKey = table.replicaIdColumns[0];

return primaryKey.typeId != null && ChunkedSnapshotQuery.SUPPORTED_TYPES.includes(Number(primaryKey.typeId));
}

private readonly key: ColumnDescriptor;
private lastKey: string | bigint | null = null;

public constructor(
private readonly connection: PgConnection,
private readonly table: SourceTable,
private readonly chunkSize: number = 10_000
) {
this.key = table.replicaIdColumns[0];
}

public async initialize(): Promise<void> {
// No-op
}

public async *nextChunk(): AsyncIterableIterator<PgChunk> {
let stream: AsyncIterableIterator<PgChunk>;
if (this.lastKey == null) {
stream = this.connection.stream(
`SELECT * FROM ${this.table.escapedIdentifier} ORDER BY ${escapeIdentifier(this.key.name)} LIMIT ${this.chunkSize}`
);
} else {
if (this.key.typeId == null) {
throw new Error(`typeId required for primary key ${this.key.name}`);
}
let type: StatementParam['type'] = Number(this.key.typeId);
stream = this.connection.stream({
statement: `SELECT * FROM ${this.table.escapedIdentifier} WHERE ${escapeIdentifier(this.key.name)} > $1 ORDER BY ${escapeIdentifier(this.key.name)} LIMIT ${this.chunkSize}`,
params: [{ value: this.lastKey, type }]
});
}
let primaryKeyIndex: number = -1;

for await (let chunk of stream) {
if (chunk.tag == 'RowDescription') {
// We get a RowDescription for each FETCH call, but they should
// all be the same.
let i = 0;
const pk = chunk.payload.findIndex((c) => c.name == this.key.name);
if (pk < 0) {
throw new Error(
`Cannot find primary key column ${this.key} in results. Keys: ${chunk.payload.map((c) => c.name).join(', ')}`
);
}
primaryKeyIndex = pk;
}

if (chunk.rows.length > 0) {
this.lastKey = chunk.rows[chunk.rows.length - 1][primaryKeyIndex];
}
yield chunk;
}
}
}

/**
* This performs a snapshot query using a list of primary keys.
*/
export class IdSnapshotQuery {
private didChunk = false;

static supports(table: SourceTable) {
// We have the same requirements as ChunkedSnapshotQuery.
// This is typically only used as a fallback when ChunkedSnapshotQuery
// skipped some rows.
return ChunkedSnapshotQuery.supports(table);
}

public constructor(
private readonly connection: PgConnection,
private readonly table: SourceTable,
private readonly keys: PrimaryKeyValue[]
) {}

public async initialize(): Promise<void> {
// No-op
}

public async *nextChunk(): AsyncIterableIterator<PgChunk> {
// Only produce one chunk
if (this.didChunk) {
return;
}
this.didChunk = true;

const keyDefinition = this.table.replicaIdColumns[0];
const ids = this.keys.map((record) => record[keyDefinition.name]);
const type = PgType.getArrayType(keyDefinition.typeId!);
if (type == null) {
throw new Error(`Cannot determine primary key array type for ${JSON.stringify(keyDefinition)}`);
}
yield* this.connection.stream({
statement: `SELECT * FROM ${this.table.escapedIdentifier} WHERE ${escapeIdentifier(keyDefinition.name)} = ANY($1)`,
params: [
{
type: type,
value: ids
}
]
});
}
}
Loading
Loading