Skip to content

Commit 83c1e38

Browse files
committed
v2.22.3: Fix partitioned tables support if they exist in microshards
1 parent 2eb89d8 commit 83c1e38

File tree

8 files changed

+108
-30
lines changed

8 files changed

+108
-30
lines changed

package-lock.json

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"name": "@clickup/pg-microsharding",
33
"description": "Microshards support for PostgreSQL",
4-
"version": "2.22.2",
4+
"version": "2.22.3",
55
"license": "MIT",
66
"keywords": [
77
"postgresql",

src/api/__tests__/move.with-tables.test.ts

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,28 @@ beforeEach(async () => {
3030
await runSql(
3131
fromDsn,
3232
sql`
33-
CREATE TABLE sh0001.tbl(id bigserial PRIMARY KEY, s text);
34-
INSERT INTO sh0001.tbl(s) SELECT s::text FROM generate_series(1, 1000) AS s;
35-
`,
33+
CREATE TABLE sh0001.tbl(id bigserial PRIMARY KEY, s text);
34+
INSERT INTO sh0001.tbl(s)
35+
SELECT s::text FROM generate_series(1, 1000) AS s;
36+
37+
CREATE TABLE sh0001."weirdly""named;tbl"(id bigserial PRIMARY KEY, s text);
38+
INSERT INTO sh0001."weirdly""named;tbl"(s)
39+
SELECT s::text FROM generate_series(1, 1000) AS s;
40+
41+
CREATE TABLE sh0001.partitioned_tbl(
42+
grp bigint,
43+
idx numeric(64, 32),
44+
PRIMARY KEY (grp, idx)
45+
) PARTITION BY HASH (grp);
46+
CREATE TABLE sh0001.partitioned_tbl_p0
47+
PARTITION OF sh0001.partitioned_tbl
48+
FOR VALUES WITH (MODULUS 2, REMAINDER 0);
49+
CREATE TABLE sh0001.partitioned_tbl_p1
50+
PARTITION OF sh0001.partitioned_tbl
51+
FOR VALUES WITH (MODULUS 2, REMAINDER 1);
52+
INSERT INTO sh0001.partitioned_tbl(grp, idx)
53+
SELECT s, s FROM generate_series(1, 1000) AS s;
54+
`,
3655
);
3756
await move({
3857
shard: 1,
@@ -50,6 +69,18 @@ beforeEach(async () => {
5069
await runSql.one(toDsn, sql`SELECT nextval('sh0001.tbl_id_seq')`),
5170
),
5271
).toBeGreaterThan(1500);
72+
expect(
73+
await runSql.one(
74+
toDsn,
75+
sql`SELECT count(1) FROM sh0001."weirdly""named;tbl"`,
76+
),
77+
).toEqual("1000");
78+
expect(
79+
await runSql.one(
80+
toDsn,
81+
sql`SELECT count(1) FROM sh0001.partitioned_tbl`,
82+
),
83+
).toEqual("1000");
5384
expect(out.stdout).toMatch(/ALTER SUBSCRIPTION.*ENABLE/m);
5485
} catch (e: unknown) {
5586
out.print();

src/api/move.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import {
1919
runSqlProgressTransactional,
2020
} from "../internal/runSqlProgress";
2121
import { startCopyingTables } from "../internal/startCopyingTables";
22-
import { validateTables } from "../internal/validateTables";
22+
import { validatePrimaryKeysFromDump } from "../internal/validatePrimaryKeysFromDump";
2323
import { waitUntilBackfillCompletes } from "../internal/waitUntilBackfillCompletes";
2424
import { waitUntilEverythingIsNotLaggingMuch } from "../internal/waitUntilEverythingIsNotLaggingMuch";
2525
import type { Unlock } from "../internal/waitUntilIncrementalCompletes";
@@ -53,12 +53,18 @@ export async function move({
5353
throw `Can't determine schema name for microshard number ${shard}`;
5454
}
5555

56-
const tables = await getTablesInSchema({ fromDsn, schema });
56+
const { tables, partitionedTables } = await getTablesInSchema({
57+
fromDsn,
58+
schema,
59+
});
5760
const { preData, primaryKeys, postData } = await getDump({
5861
fromDsn,
5962
schema,
6063
});
61-
validateTables({ tables, primaryKeys });
64+
validatePrimaryKeysFromDump({
65+
primaryKeys,
66+
allTables: [...tables, ...partitionedTables],
67+
});
6268

6369
await waitUntilEverythingIsNotLaggingMuch({
6470
when: "before starting the work",

src/internal/getDump.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ export async function getDump({
3939
`Reading post-data DDL (indexes, foreign keys) for ${schema} from ${fromDsn}`,
4040
),
4141
);
42-
// To logical replication to work, the tables on the subscription part must
42+
// For logical replication to work, each table on the subscription part must
4343
// have an index that allows the logical worker to find the row it receives
4444
// from the publication end. PostgreSQL 16+ can use regular indexes for that,
4545
// but the earlier versions require the subscription end to have either a

src/internal/getTablesInSchema.ts

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import partition from "lodash/partition";
12
import { sql } from "./quote";
23
import { runSql } from "./runSql";
34

@@ -8,6 +9,9 @@ export interface TableInfo {
89
name: string;
910
/** Raw owner name (unquoted). */
1011
owner: string;
12+
/** Whether this is a base table for a partitioned table. Such tables have no
13+
* data and must not be mentioned in publications. */
14+
isPartitioned: boolean;
1115
/** Existing index that can be used on the subscriber end to identify the rows
1216
* for received updates and deletes. */
1317
identity: {
@@ -22,20 +26,30 @@ export interface TableInfo {
2226
} | null;
2327
}
2428

29+
/**
30+
* Returns all tables in a given schema on fromDsn.
31+
*
32+
* For logical subscription purposes, we only need the `tables` that may
33+
* potentially contain data (e.g. regular tables, or partitions). But we also
34+
* return the base tables for partitioned tables in `partitionedTables`, since
35+
* we validate the primary key coverage in `validatePrimaryKeysFromDump()` for sanity
36+
* checking purposes.
37+
*/
2538
export async function getTablesInSchema({
2639
fromDsn,
2740
schema,
2841
}: {
2942
fromDsn: string;
3043
schema: string;
31-
}): Promise<TableInfo[]> {
44+
}): Promise<{ tables: TableInfo[]; partitionedTables: TableInfo[] }> {
3245
const rows = await runSql(
3346
fromDsn,
3447
sql`
3548
SELECT DISTINCT ON (pg_namespace.nspname, pg_class.relname, pg_roles.rolname)
3649
pg_namespace.nspname,
3750
pg_class.relname,
3851
pg_roles.rolname,
52+
pg_class.relkind,
3953
idx_class.relname,
4054
pg_index.indisreplident,
4155
pg_index.indisprimary,
@@ -45,7 +59,7 @@ export async function getTablesInSchema({
4559
JOIN pg_roles ON relowner = pg_roles.oid
4660
LEFT JOIN pg_index ON indrelid = pg_class.oid AND (indisreplident OR indisprimary)
4761
LEFT JOIN pg_class idx_class ON idx_class.oid = pg_index.indexrelid
48-
WHERE pg_class.relkind = 'r' AND nspname = ${schema}
62+
WHERE pg_class.relkind IN ('r', 'p') AND nspname = ${schema}
4963
ORDER BY
5064
pg_namespace.nspname,
5165
pg_class.relname,
@@ -54,17 +68,23 @@ export async function getTablesInSchema({
5468
pg_index.indisprimary DESC
5569
`,
5670
);
57-
return rows.map((row) => ({
71+
const res = rows.map((row) => ({
5872
schema: row[0],
5973
name: row[1],
6074
owner: row[2],
61-
identity: row[3]
75+
isPartitioned: row[3] === "p",
76+
identity: row[4]
6277
? {
63-
indexName: row[3],
64-
isReplIdent: row[4] === "t",
65-
isPrimaryKey: row[5] === "t",
66-
def: row[6],
78+
indexName: row[4],
79+
isReplIdent: row[5] === "t",
80+
isPrimaryKey: row[6] === "t",
81+
def: row[7],
6782
}
6883
: null,
6984
}));
85+
const [partitionedTables, tables] = partition(
86+
res,
87+
(table) => table.isPartitioned,
88+
);
89+
return { tables, partitionedTables };
7090
}

src/internal/isNameInSql.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
import escapeRegExp from "lodash/escapeRegExp";
2+
import { ident } from "./quote";
3+
4+
/**
5+
* Returns true if an SQL clause includes a properly quoted identifier (like
6+
* index name).
7+
*/
8+
export function isNameInSql(name: string, sql: string): boolean {
9+
return !!sql.match(
10+
new RegExp("(\\W|^)" + escapeRegExp(ident(name).toString()) + "(\\W|$)"),
11+
);
12+
}

src/internal/validateTables.ts renamed to src/internal/validatePrimaryKeysFromDump.ts

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,19 @@
11
import type { TableInfo } from "./getTablesInSchema";
2-
import { ident } from "./quote";
2+
import { isNameInSql } from "./isNameInSql";
33

4-
export function validateTables({
5-
tables,
4+
/**
5+
* Validates that the list of primary keys extracted from the dump earlier match
6+
* the actual list of tables read from pg_catalog. This is mainly for sanity
7+
* checking the dump parsing logic.
8+
*/
9+
export function validatePrimaryKeysFromDump({
610
primaryKeys,
11+
allTables,
712
}: {
8-
tables: TableInfo[];
913
primaryKeys: string[];
14+
allTables: TableInfo[];
1015
}): void {
11-
const tablesWithoutIdentity = tables.filter(({ identity }) => !identity);
16+
const tablesWithoutIdentity = allTables.filter((table) => !table.identity);
1217
if (tablesWithoutIdentity.length > 0) {
1318
throw (
1419
"To be moved, every table in the schema must have a PRIMARY KEY " +
@@ -18,10 +23,14 @@ export function validateTables({
1823
);
1924
}
2025

26+
const tables = allTables as Array<
27+
TableInfo & { identity: NonNullable<TableInfo["identity"]> }
28+
>;
29+
2130
const tablesWithoutDumpEntries = tables.filter(
22-
({ identity }) =>
23-
!primaryKeys.some((part) =>
24-
part.includes(ident(identity!.indexName).toString()),
31+
(table) =>
32+
!primaryKeys.some((primaryKeySql) =>
33+
isNameInSql(table.identity.indexName, primaryKeySql),
2534
),
2635
);
2736
if (tablesWithoutDumpEntries.length > 0) {
@@ -30,15 +39,15 @@ export function validateTables({
3039
"are not found in the parsed post-data section of the dump. Is there a bug " +
3140
"in dump parsing logic? Missing indexes:\n" +
3241
tablesWithoutDumpEntries
33-
.map(({ name, identity }) => `- ${identity?.indexName} (table ${name})`)
42+
.map(({ name, identity }) => `- ${identity.indexName} (table ${name})`)
3443
.join("\n")
3544
);
3645
}
3746

3847
const dumpEntriesWithoutTables = primaryKeys.filter(
39-
(part) =>
40-
!tables.some(({ identity }) =>
41-
part.includes(ident(identity!.indexName).toString()),
48+
(primaryKeySql) =>
49+
!tables.some((table) =>
50+
isNameInSql(table.identity.indexName, primaryKeySql),
4251
),
4352
);
4453
if (dumpEntriesWithoutTables.length > 0) {

0 commit comments

Comments
 (0)