Skip to content

Commit fa33510

Browse files
authored
fix: improve migrations of prefixes (#644)
1 parent fc0ebf4 commit fa33510

File tree

12 files changed

+308
-58
lines changed

12 files changed

+308
-58
lines changed
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
-- postgres-migrations disable-transaction
2-
CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS idx_name_bucket_unique on storage.objects (name COLLATE "C", bucket_id);
2+
CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS idx_name_bucket_level_unique on storage.objects (name COLLATE "C", bucket_id, level);

migrations/tenant/0029-create-prefixes.sql

+32-1
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,19 @@
33
-- We run this with 50k batch size to avoid long running transaction
44
DO $$
55
DECLARE
6-
batch_size INTEGER := 50000;
6+
batch_size INTEGER := 10000;
77
total_scanned INTEGER := 0;
88
row_returned INTEGER := 0;
99
last_name TEXT COLLATE "C" := NULL;
1010
last_bucket_id TEXT COLLATE "C" := NULL;
11+
delay INTEGER := 1;
12+
start_time TIMESTAMPTZ;
13+
end_time TIMESTAMPTZ;
14+
exec_duration INTERVAL;
1115
BEGIN
1216
LOOP
17+
start_time := clock_timestamp(); -- Start time of batch
18+
1319
-- Fetch a batch of objects ordered by name COLLATE "C"
1420
WITH batch as (
1521
SELECT id, bucket_id, name, owner
@@ -37,8 +43,15 @@ DO $$
3743
)
3844
SELECT count, cursor.last_name, cursor.last_bucket FROM cursor, batch_count INTO row_returned, last_name, last_bucket_id;
3945

46+
end_time := clock_timestamp(); -- End time after batch processing
47+
exec_duration := end_time - start_time; -- Calculate elapsed time
48+
4049
RAISE NOTICE 'Object Row returned: %', row_returned;
4150
RAISE NOTICE 'Last Object: %', last_name;
51+
RAISE NOTICE 'Execution time for this batch: %', exec_duration;
52+
RAISE NOTICE 'Delay: %', delay;
53+
RAISE NOTICE 'Batch size: %', batch_size;
54+
RAISE NOTICE '-------------------------------------------------';
4255

4356
total_scanned := total_scanned + row_returned;
4457

@@ -48,6 +61,24 @@ DO $$
4861
EXIT;
4962
ELSE
5063
COMMIT;
64+
PERFORM pg_sleep(delay);
65+
-- Increase delay by 1 second for each iteration until 30
66+
-- then reset it back to 1
67+
SELECT CASE WHEN delay >= 10 THEN 1 ELSE delay + 1 END INTO delay;
68+
69+
-- Update the batch size:
70+
-- If execution time > 3 seconds, reset batch_size to 20k.
71+
-- If the batch size is already 20k, decrease it by 1k until 5k.
72+
-- Otherwise, increase batch_size by 5000 up to a maximum of 50k.
73+
IF exec_duration > interval '3 seconds' THEN
74+
IF batch_size <= 20000 THEN
75+
batch_size := GREATEST(batch_size - 1000, 5000);
76+
ELSE
77+
batch_size := 20000;
78+
END IF;
79+
ELSE
80+
batch_size := LEAST(batch_size + 5000, 50000);
81+
END IF;
5182
END IF;
5283
END LOOP;
5384
END;

migrations/tenant/0030-update-object-levels.sql

+31
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,14 @@ DO $$
88
row_returned INTEGER := 0;
99
last_name TEXT COLLATE "C" := NULL;
1010
last_bucket_id TEXT COLLATE "C" := NULL;
11+
delay INTEGER := 1;
12+
start_time TIMESTAMPTZ;
13+
end_time TIMESTAMPTZ;
14+
exec_duration INTERVAL;
1115
BEGIN
1216
LOOP
17+
start_time := clock_timestamp(); -- Start time of batch
18+
1319
-- Fetch a batch of objects ordered by name COLLATE "C"
1420
WITH batch as (
1521
SELECT id, bucket_id, name, storage.get_level(name) as level
@@ -33,8 +39,15 @@ DO $$
3339
)
3440
SELECT count, cursor.last_name, cursor.last_bucket FROM cursor, batch_count INTO row_returned, last_name, last_bucket_id;
3541

42+
end_time := clock_timestamp(); -- End time after batch processing
43+
exec_duration := end_time - start_time; -- Calculate elapsed time
44+
3645
RAISE NOTICE 'Object Row returned: %', row_returned;
3746
RAISE NOTICE 'Last Object: %', last_name;
47+
RAISE NOTICE 'Execution time for this batch: %', exec_duration;
48+
RAISE NOTICE 'Delay: %', delay;
49+
RAISE NOTICE 'Batch size: %', batch_size;
50+
RAISE NOTICE '-------------------------------------------------';
3851

3952
total_scanned := total_scanned + row_returned;
4053

@@ -44,6 +57,24 @@ DO $$
4457
EXIT;
4558
ELSE
4659
COMMIT;
60+
PERFORM pg_sleep(delay);
61+
-- Increase delay by 1 second for each iteration until 30
62+
-- then reset it back to 1
63+
SELECT CASE WHEN delay >= 10 THEN 1 ELSE delay + 1 END INTO delay;
64+
65+
-- Update the batch size:
66+
-- If execution time > 3 seconds, reset batch_size to 10k.
67+
-- If the batch size is already 10k, decrease it by 1k until 5k.
68+
-- Otherwise, increase batch_size by 5000 up to a maximum of 50k.
69+
IF exec_duration > interval '3 seconds' THEN
70+
IF batch_size <= 10000 THEN
71+
batch_size := GREATEST(batch_size - 1000, 5000);
72+
ELSE
73+
batch_size := 10000;
74+
END IF;
75+
ELSE
76+
batch_size := LEAST(batch_size + 5000, 50000);
77+
END IF;
4778
END IF;
4879
END LOOP;
4980
END;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
create or replace function storage.search (
2+
prefix text,
3+
bucketname text,
4+
limits int default 100,
5+
levels int default 1,
6+
offsets int default 0,
7+
search text default '',
8+
sortcolumn text default 'name',
9+
sortorder text default 'asc'
10+
) returns table (
11+
name text,
12+
id uuid,
13+
updated_at timestamptz,
14+
created_at timestamptz,
15+
last_accessed_at timestamptz,
16+
metadata jsonb
17+
)
18+
as $$
19+
declare
20+
can_bypass_rls BOOLEAN;
21+
begin
22+
SELECT rolbypassrls
23+
INTO can_bypass_rls
24+
FROM pg_roles
25+
WHERE rolname = coalesce(nullif(current_setting('role', true), 'none'), current_user);
26+
27+
IF can_bypass_rls THEN
28+
RETURN QUERY SELECT * FROM storage.search_v1_optimised(prefix, bucketname, limits, levels, offsets, search, sortcolumn, sortorder);
29+
ELSE
30+
RETURN QUERY SELECT * FROM storage.search_legacy_v1(prefix, bucketname, limits, levels, offsets, search, sortcolumn, sortorder);
31+
END IF;
32+
end;
33+
$$ language plpgsql volatile;
34+
35+
36+
CREATE OR REPLACE FUNCTION storage.extension(name text)
37+
RETURNS text
38+
LANGUAGE plpgsql
39+
IMMUTABLE
40+
AS $function$
41+
DECLARE
42+
_parts text[];
43+
_filename text;
44+
BEGIN
45+
-- Split on "/" to get path segments
46+
SELECT string_to_array(name, '/') INTO _parts;
47+
-- Get the last path segment (the actual filename)
48+
SELECT _parts[array_length(_parts, 1)] INTO _filename;
49+
-- Extract extension: reverse, split on '.', then reverse again
50+
RETURN reverse(split_part(reverse(_filename), '.', 1));
51+
END
52+
$function$;
53+
54+
55+
CREATE OR REPLACE FUNCTION storage.foldername(name text)
56+
RETURNS text[]
57+
LANGUAGE plpgsql
58+
IMMUTABLE
59+
AS $function$
60+
DECLARE
61+
_parts text[];
62+
BEGIN
63+
-- Split on "/" to get path segments
64+
SELECT string_to_array(name, '/') INTO _parts;
65+
-- Return everything except the last segment
66+
RETURN _parts[1 : array_length(_parts,1) - 1];
67+
END
68+
$function$;
69+
70+
71+
CREATE OR REPLACE FUNCTION storage.extension(name text)
72+
RETURNS text
73+
LANGUAGE plpgsql
74+
IMMUTABLE
75+
AS $function$
76+
DECLARE
77+
_parts text[];
78+
_filename text;
79+
BEGIN
80+
SELECT string_to_array(name, '/') INTO _parts;
81+
SELECT _parts[array_length(_parts,1)] INTO _filename;
82+
RETURN reverse(split_part(reverse(_filename), '.', 1));
83+
END
84+
$function$;
85+
86+
DROP FUNCTION storage.get_size_by_bucket();
87+
CREATE OR REPLACE FUNCTION storage.get_size_by_bucket()
88+
RETURNS TABLE (
89+
size BIGINT,
90+
bucket_id text
91+
)
92+
LANGUAGE plpgsql
93+
STABLE
94+
AS $function$
95+
BEGIN
96+
return query
97+
select sum((metadata->>'size')::bigint) as size, obj.bucket_id
98+
from "storage".objects as obj
99+
group by obj.bucket_id;
100+
END
101+
$function$;
102+
103+
CREATE OR REPLACE FUNCTION "storage"."objects_update_prefix_trigger"()
104+
RETURNS trigger
105+
AS $func$
106+
DECLARE
107+
old_prefixes TEXT[];
108+
BEGIN
109+
-- Ensure this is an update operation and the name has changed
110+
IF TG_OP = 'UPDATE' AND (NEW."name" <> OLD."name" OR NEW."bucket_id" <> OLD."bucket_id") THEN
111+
-- Retrieve old prefixes
112+
old_prefixes := "storage"."get_prefixes"(OLD."name");
113+
114+
-- Remove old prefixes that are only used by this object
115+
WITH all_prefixes as (
116+
SELECT unnest(old_prefixes) as prefix
117+
),
118+
can_delete_prefixes as (
119+
SELECT prefix
120+
FROM all_prefixes
121+
WHERE NOT EXISTS (
122+
SELECT 1 FROM "storage"."objects"
123+
WHERE "bucket_id" = OLD."bucket_id"
124+
AND "name" <> OLD."name"
125+
AND "name" LIKE (prefix || '%')
126+
)
127+
)
128+
DELETE FROM "storage"."prefixes" WHERE name IN (SELECT prefix FROM can_delete_prefixes);
129+
130+
-- Add new prefixes
131+
PERFORM "storage"."add_prefixes"(NEW."bucket_id", NEW."name");
132+
END IF;
133+
-- Set the new level
134+
NEW."level" := "storage"."get_level"(NEW."name");
135+
136+
RETURN NEW;
137+
END;
138+
$func$ LANGUAGE plpgsql VOLATILE;
139+
140+
CREATE OR REPLACE TRIGGER "objects_update_create_prefix"
141+
BEFORE UPDATE ON "storage"."objects"
142+
FOR EACH ROW
143+
WHEN (NEW.name != OLD.name OR NEW.bucket_id != OLD.bucket_id)
144+
EXECUTE FUNCTION "storage"."objects_update_prefix_trigger"();

src/http/routes/admin/migrations.ts

+29-29
Original file line numberDiff line numberDiff line change
@@ -25,35 +25,6 @@ export default async function routes(fastify: FastifyInstance) {
2525
return reply.send({ message: 'Migrations scheduled' })
2626
})
2727

28-
fastify.get('/active', async (req, reply) => {
29-
if (!pgQueueEnable) {
30-
return reply.code(400).send({ message: 'Queue is not enabled' })
31-
}
32-
const data = await multitenantKnex
33-
.table('pgboss.job')
34-
.where('state', 'active')
35-
.where('name', 'tenants-migrations')
36-
.orderBy('createdon', 'desc')
37-
.limit(2000)
38-
39-
return reply.send(data)
40-
})
41-
42-
fastify.delete('/active', async (req, reply) => {
43-
if (!pgQueueEnable) {
44-
return reply.code(400).send({ message: 'Queue is not enabled' })
45-
}
46-
const data = await multitenantKnex
47-
.table('pgboss.job')
48-
.where('state', 'active')
49-
.where('name', 'tenants-migrations')
50-
.orderBy('createdon', 'desc')
51-
.update({ state: 'completed' })
52-
.limit(2000)
53-
54-
return reply.send(data)
55-
})
56-
5728
fastify.post('/reset/fleet', async (req, reply) => {
5829
if (!pgQueueEnable) {
5930
return reply.status(400).send({ message: 'Queue is not enabled' })
@@ -86,6 +57,35 @@ export default async function routes(fastify: FastifyInstance) {
8657
return reply.send({ message: 'Migrations scheduled' })
8758
})
8859

60+
fastify.get('/active', async (req, reply) => {
61+
if (!pgQueueEnable) {
62+
return reply.code(400).send({ message: 'Queue is not enabled' })
63+
}
64+
const data = await multitenantKnex
65+
.table('pgboss.job')
66+
.where('state', 'active')
67+
.where('name', 'tenants-migrations')
68+
.orderBy('createdon', 'desc')
69+
.limit(2000)
70+
71+
return reply.send(data)
72+
})
73+
74+
fastify.delete('/active', async (req, reply) => {
75+
if (!pgQueueEnable) {
76+
return reply.code(400).send({ message: 'Queue is not enabled' })
77+
}
78+
const data = await multitenantKnex
79+
.table('pgboss.job')
80+
.where('state', 'active')
81+
.where('name', 'tenants-migrations')
82+
.orderBy('createdon', 'desc')
83+
.update({ state: 'completed' })
84+
.limit(2000)
85+
86+
return reply.send(data)
87+
})
88+
8989
fastify.get('/progress', async (req, reply) => {
9090
if (!pgQueueEnable) {
9191
return reply.code(400).send({ message: 'Queue is not enabled' })

src/http/routes/admin/objects.ts

+6-4
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ export default async function routes(fastify: FastifyInstance) {
103103
keepTmpTable: Boolean(req.query.keepTmpTable),
104104
})
105105

106-
reply.header('Content-Type', 'application/json; charset=utf-8')
106+
reply.header('Content-Type', 'application/x-ndjson; charset=utf-8')
107107

108108
// Do not let the connection time out, periodically send
109109
// a ping message to keep the connection alive
@@ -117,7 +117,7 @@ export default async function routes(fastify: FastifyInstance) {
117117
JSON.stringify({
118118
...result,
119119
event: 'data',
120-
})
120+
}) + '\n'
121121
)
122122
}
123123
}
@@ -150,6 +150,8 @@ export default async function routes(fastify: FastifyInstance) {
150150
before.setHours(before.getHours() - 1)
151151
}
152152

153+
reply.header('Content-Type', 'application/x-ndjson; charset=utf-8')
154+
153155
const respPing = ping(reply)
154156

155157
try {
@@ -168,7 +170,7 @@ export default async function routes(fastify: FastifyInstance) {
168170
JSON.stringify({
169171
...deleted,
170172
event: 'data',
171-
})
173+
}) + '\n'
172174
)
173175
}
174176
} catch (e) {
@@ -193,7 +195,7 @@ function ping(reply: FastifyReply) {
193195
reply.raw.write(
194196
JSON.stringify({
195197
event: 'ping',
196-
})
198+
}) + '\n'
197199
)
198200
}
199201
}, 1000 * 10)

0 commit comments

Comments
 (0)