Skip to content

Commit 5a5df79

Browse files
committed
v2.19.1: Eliminate deadlocks when moving a shard that is heavily written; add end-to-end tests for "pg-microsharding move"
1 parent 5de8264 commit 5a5df79

19 files changed

+409
-144
lines changed

docker-compose.postgres.yml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,13 @@ services:
22
postgres:
33
image: postgres:16
44
ports:
5-
- "${PGPORT:?err}:5432"
5+
- "${PGPORT:?err}:${PGPORT}"
66
environment:
77
POSTGRES_PASSWORD: postgres
88
PGDATA: /tmp/postgresql
9-
POSTGRES_INITDB_ARGS: "-c max_connections=1000 -c synchronous_commit=off"
9+
POSTGRES_INITDB_ARGS: "-c port=$PGPORT -c max_connections=1000 -c synchronous_commit=off -c wal_level=logical"
1010
healthcheck:
11-
test: "pg_isready -U postgres"
12-
interval: 1s
11+
test: "PGPORT=$PGPORT pg_isready -U postgres"
12+
interval: 0.3s
1313
timeout: 20s
1414
retries: 10

docs/functions/move.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
> **move**(`__namedParameters`): `Promise`\<`void`\>
1010
11-
Defined in: [src/api/move.ts:27](https://github.com/clickup/pg-microsharding/blob/master/src/api/move.ts#L27)
11+
Defined in: [src/api/move.ts:29](https://github.com/clickup/pg-microsharding/blob/master/src/api/move.ts#L29)
1212

1313
Moves a shard from one master DB to another.
1414

package-lock.json

Lines changed: 5 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"name": "@clickup/pg-microsharding",
33
"description": "Microshards support for PostgreSQL",
4-
"version": "2.18.1",
4+
"version": "2.19.1",
55
"license": "MIT",
66
"keywords": [
77
"postgresql",
@@ -17,7 +17,7 @@
1717
"build": "internal/build.sh",
1818
"dev": "internal/dev.sh",
1919
"lint": "internal/lint.sh",
20-
"test": "PGPORT=54833 internal/with-docker-compose-up.postgres.sh 'f() { internal/test.sh \"$@\" && for f in sql/__tests__/test_*.sql; do echo == $f; echo; psql -f $f; echo; echo; done; }; f'",
20+
"test": "PGPORT=54833 internal/with-docker-compose-up.postgres.sh 'f() { internal/test.sh \"$@\" && if [ \"$1\" == \"\" ]; then for f in sql/__tests__/test_*.sql; do echo == $f; echo; psql -f $f; echo; echo; done; fi; }; f'",
2121
"psql": "PGPORT=54833 internal/with-docker-compose-up.postgres.sh psql",
2222
"docs": "internal/update-readme.js && internal/docs.sh",
2323
"clean": "internal/clean.sh",

src/__tests__/internal/helpers.ts

Lines changed: 100 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,25 @@
1+
import { stripVTControlCharacters } from "util";
12
import { createConnectedClient } from "../../internal/createConnectedClient";
3+
import { stderr, stdout } from "../../internal/logging";
24
import { libSchema } from "../../internal/names";
35
import { normalizeDsn } from "../../internal/normalizeDsn";
46
import { ident, join, sql } from "../../internal/quote";
57
import { runSql } from "../../internal/runSql";
68

7-
jest.mock("../../internal/names", () => ({
8-
...jest.requireActual("../../internal/names"),
9-
libSchema: () =>
10-
ident(
11-
`microsharding_test_${String.fromCharCode("a".charCodeAt(0) + parseInt(process.env["JEST_WORKER_ID"]!) - 1)}`,
12-
),
13-
}));
14-
9+
/**
10+
* Creates a brand new test database with the name equals to the current DB name
11+
* with "~" + JEST_WORKER_ID suffix. The database is cleaned before returning.
12+
*/
1513
export async function ensureTestDBExistsBeforeAll({
1614
from,
1715
to,
1816
state,
17+
dbNameSuffix,
1918
}: {
20-
from: number;
21-
to: number;
22-
state: "active" | "inactive";
19+
from?: number;
20+
to?: number;
21+
state?: "active" | "inactive";
22+
dbNameSuffix?: string;
2323
}): Promise<string> {
2424
const env = {
2525
PGHOST: process.env["PGHOST"] || process.env["DB_HOST_DEFAULT"],
@@ -28,43 +28,116 @@ export async function ensureTestDBExistsBeforeAll({
2828
PGPASSWORD: process.env["PGPASSWORD"] || process.env["DB_PASS"],
2929
PGDATABASE: process.env["PGDATABASE"] || process.env["DB_DATABASE"],
3030
};
31-
const testDsn = normalizeDsn(env.PGHOST, env)!;
32-
if (!testDsn) {
31+
const dsn = normalizeDsn(env.PGHOST, env)!;
32+
if (!dsn) {
3333
throw new Error("No PGHOST or DB_HOST_DEFAULT passed.");
3434
}
3535

36+
const testDB =
37+
new URL(dsn).pathname.substring(1) +
38+
"~pg-micro-" +
39+
process.env["JEST_WORKER_ID"] +
40+
(dbNameSuffix ?? "");
41+
const testDsnUrl = new URL(dsn);
42+
testDsnUrl.pathname = `/${testDB}`;
43+
const testDsn = testDsnUrl.toString();
44+
45+
if (
46+
!(await runSql.one(
47+
dsn,
48+
sql`SELECT datname FROM pg_database WHERE datname=${testDB}`,
49+
))
50+
) {
51+
await runSql(dsn, sql`CREATE DATABASE ${ident(testDB)}`);
52+
} else {
53+
const schemas = await runSql.column(
54+
testDsn,
55+
sql`SELECT nspname FROM pg_namespace WHERE nspname = ${libSchema().toString()} OR nspname LIKE 'sh%'`,
56+
);
57+
if (schemas.length > 0) {
58+
await runSql(
59+
testDsn,
60+
sql`DROP SCHEMA IF EXISTS ${join(schemas.map(ident), ", ")} CASCADE`,
61+
);
62+
}
63+
}
64+
3665
await runSql(
3766
testDsn,
3867
join(
3968
[
4069
sql`CREATE SCHEMA IF NOT EXISTS ${libSchema()};`,
4170
sql`SET search_path TO ${libSchema()};`,
4271
sql`\\ir ${__dirname}/../../../sql/pg-microsharding-up.sql`,
43-
sql`
44-
CREATE OR REPLACE FUNCTION ${libSchema()}.microsharding_schema_name_(
45-
shard integer
46-
) RETURNS text
47-
LANGUAGE sql
48-
SET search_path FROM CURRENT
49-
AS $$
50-
SELECT ${libSchema() + "_"} || lpad($1::text, 4, '0')
51-
$$;
52-
`,
53-
sql`SELECT ${libSchema()}.microsharding_ensure_exist(${from}, ${to});`,
54-
state === "active"
55-
? sql`SELECT ${libSchema()}.microsharding_ensure_active(${from}, ${to});`
56-
: sql`SELECT ${libSchema()}.microsharding_ensure_inactive(${from}, ${to});`,
72+
...(from && to
73+
? [
74+
sql`SELECT ${libSchema()}.microsharding_ensure_exist(${from}, ${to});`,
75+
state === "active"
76+
? sql`SELECT ${libSchema()}.microsharding_ensure_active(${from}, ${to});`
77+
: sql`SELECT ${libSchema()}.microsharding_ensure_inactive(${from}, ${to});`,
78+
]
79+
: []),
5780
],
5881
"\n",
5982
),
6083
);
6184
return testDsn;
6285
}
6386

87+
/**
88+
* Creates a connected client to the test database.
89+
*/
6490
export async function createTestConnectedClient(
6591
dsn: string,
6692
): ReturnType<typeof createConnectedClient> {
6793
const client = await createConnectedClient(dsn);
6894
await client.query(`SET search_path = ${libSchema()}`);
6995
return client;
7096
}
97+
98+
/**
99+
* Intercepts stdout/stderr from logging module.
100+
*/
101+
export function mockStd(): {
102+
stdout: string;
103+
stderr: string;
104+
out: string;
105+
print: () => void;
106+
} {
107+
const stdoutBuf: Buffer[] = [];
108+
const stderrBuf: Buffer[] = [];
109+
const outBuf: Buffer[] = [];
110+
111+
for (const [std, buf] of [
112+
[stdout, stdoutBuf],
113+
[stderr, stderrBuf],
114+
] as const) {
115+
jest.spyOn(std, "write").mockImplementation((chunk, maybeCb, cb) => {
116+
chunk = typeof chunk === "string" ? Buffer.from(chunk) : chunk;
117+
if (typeof maybeCb === "function") {
118+
cb = maybeCb;
119+
}
120+
121+
buf.push(chunk);
122+
outBuf.push(chunk);
123+
cb?.(null);
124+
return true;
125+
});
126+
}
127+
128+
return {
129+
get stdout() {
130+
return stripVTControlCharacters(Buffer.concat(stdoutBuf).toString());
131+
},
132+
get stderr() {
133+
return stripVTControlCharacters(Buffer.concat(stderrBuf).toString());
134+
},
135+
get out() {
136+
return stripVTControlCharacters(Buffer.concat(outBuf).toString());
137+
},
138+
print() {
139+
// eslint-disable-next-line no-console
140+
console.log(this.out);
141+
},
142+
};
143+
}

src/actions/actionList.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import { table } from "table";
1313
import { weights } from "../api/weights";
1414
import { chunkIntoN } from "../internal/chunkIntoN";
1515
import { WEIGHT_COL_NAME_DEFAULT } from "../internal/getSchemaWeights";
16-
import { indent, print, progress, section } from "../internal/logging";
16+
import { indent, print, progress, section, stdout } from "../internal/logging";
1717
import { dsnShort } from "../internal/names";
1818
import { normalizeDsns } from "../internal/normalizeDsns";
1919
import type { Args } from "../internal/parseArgs";
@@ -102,7 +102,7 @@ export async function actionList(args: Args): Promise<boolean> {
102102
) ?? 0);
103103

104104
const maxTablesSideBySide = Math.trunc(
105-
((process.stdout.columns || 80) - 10) / tableWidth,
105+
((stdout.columns || 80) - 10) / tableWidth,
106106
);
107107

108108
for (const [islandNo, shards] of islands) {
@@ -130,7 +130,7 @@ export async function actionList(args: Args): Promise<boolean> {
130130
const shardTables =
131131
islands.size === 1 ||
132132
maxTablesSideBySide === 0 ||
133-
grandTotalShards + islands.size * 8 < (process.stdout.rows || 25) ||
133+
grandTotalShards + islands.size * 8 < (stdout.rows || 25) ||
134134
shards.length <= 4
135135
? [shards]
136136
: shards.length / maxTablesSideBySide < 2
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import {
2+
ensureTestDBExistsBeforeAll,
3+
mockStd,
4+
} from "../../__tests__/internal/helpers";
5+
import { listActiveSchemas } from "../../internal/listActiveSchemas";
6+
import { promiseAll } from "../../internal/promiseAll";
7+
import { move } from "../move";
8+
9+
let fromDsn: string;
10+
let toDsn: string;
11+
12+
beforeAll(async () => {
13+
[fromDsn, toDsn] = await promiseAll([
14+
ensureTestDBExistsBeforeAll({
15+
from: 1,
16+
to: 1,
17+
state: "active",
18+
}),
19+
ensureTestDBExistsBeforeAll({
20+
dbNameSuffix: "to",
21+
}),
22+
]);
23+
});
24+
25+
test("move a shard with no tables", async () => {
26+
const out = mockStd();
27+
try {
28+
await move({
29+
shard: 1,
30+
fromDsn,
31+
toDsn,
32+
activateOnDestination: true,
33+
});
34+
expect(await listActiveSchemas({ dsn: toDsn })).toEqual(["sh0001"]);
35+
expect(await listActiveSchemas({ dsn: fromDsn })).toEqual([]);
36+
} catch (e: unknown) {
37+
out.print();
38+
throw e;
39+
}
40+
});
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
import {
2+
ensureTestDBExistsBeforeAll,
3+
mockStd,
4+
} from "../../__tests__/internal/helpers";
5+
import { listActiveSchemas } from "../../internal/listActiveSchemas";
6+
import { promiseAll } from "../../internal/promiseAll";
7+
import { sql } from "../../internal/quote";
8+
import { runSql } from "../../internal/runSql";
9+
import { move } from "../move";
10+
11+
let fromDsn: string;
12+
let toDsn: string;
13+
14+
beforeAll(async () => {
15+
[fromDsn, toDsn] = await promiseAll([
16+
ensureTestDBExistsBeforeAll({
17+
from: 1,
18+
to: 1,
19+
state: "active",
20+
}),
21+
ensureTestDBExistsBeforeAll({
22+
dbNameSuffix: "to",
23+
}),
24+
]);
25+
});
26+
27+
test("move a shard with tables", async () => {
28+
const out = mockStd();
29+
try {
30+
await runSql(
31+
fromDsn,
32+
sql`
33+
CREATE TABLE sh0001.tbl(id bigserial PRIMARY KEY, s text);
34+
INSERT INTO sh0001.tbl(s) SELECT s::text FROM generate_series(1, 1000) AS s;
35+
`,
36+
);
37+
await move({
38+
shard: 1,
39+
fromDsn,
40+
toDsn,
41+
activateOnDestination: true,
42+
});
43+
expect(await listActiveSchemas({ dsn: toDsn })).toEqual(["sh0001"]);
44+
expect(await listActiveSchemas({ dsn: fromDsn })).toEqual([]);
45+
expect(
46+
await runSql.one(toDsn, sql`SELECT count(1) FROM sh0001.tbl`),
47+
).toEqual("1000");
48+
expect(
49+
Number(await runSql.one(toDsn, sql`SELECT nextval('sh0001.tbl_id_seq')`)),
50+
).toBeGreaterThan(1500);
51+
expect(out.stdout).toMatch(/ALTER SUBSCRIPTION.*ENABLE/m);
52+
} catch (e: unknown) {
53+
out.print();
54+
throw e;
55+
}
56+
});

0 commit comments

Comments
 (0)