Skip to content

Commit ba88c8f

Browse files
committed
feat: store lsn on event
1 parent 9cc67a8 commit ba88c8f

12 files changed

Lines changed: 903 additions & 7 deletions

File tree

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
-- add lsn column to events table for precise slot recovery
2+
-- the lsn is captured at trigger execution time to enable exact replay point lookup
3+
-- when a replication slot is invalidated
4+
5+
alter table pgstream.events
6+
add column if not exists lsn pg_lsn;
7+
8+
-- update the sync_database_trigger function to capture lsn on insert
9+
create or replace function pgstream.sync_database_trigger() returns trigger
10+
language plpgsql security definer
11+
as $_$
12+
declare
13+
v_table_name text := coalesce(new.table_name, old.table_name);
14+
v_schema_name text := coalesce(new.schema_name, old.schema_name);
15+
v_when_clause text;
16+
v_if_blocks text;
17+
v_op pgstream.operation_type;
18+
begin
19+
foreach v_op in array array['INSERT', 'UPDATE', 'DELETE']::pgstream.operation_type[] loop
20+
execute format(
21+
$sql$drop trigger if exists pgstream_%s on %I.%I;$sql$,
22+
lower(v_op::text), v_schema_name, v_table_name
23+
);
24+
25+
execute format(
26+
$sql$drop function if exists pgstream._publish_after_%s_on_%s;$sql$,
27+
lower(v_op::text), v_table_name
28+
);
29+
30+
if exists (select 1 from pgstream.subscriptions where table_name = v_table_name and schema_name = v_schema_name and operation = v_op) then
31+
-- if there is at least one subscription for v_op operation without a when_clause or with an empty one, we do not add the when clause at all
32+
v_when_clause := (
33+
case when exists (
34+
select 1
35+
from pgstream.subscriptions
36+
where table_name = v_table_name and schema_name = v_schema_name and operation = v_op and (when_clause is null or when_clause = '')
37+
) then null
38+
else (
39+
select string_agg(when_clause, ') or (')
40+
from pgstream.subscriptions
41+
where table_name = v_table_name and schema_name = v_schema_name and operation = v_op and when_clause is not null and when_clause != ''
42+
)
43+
end
44+
);
45+
46+
v_if_blocks := (
47+
select string_agg(format(
48+
$sql$
49+
if %s then
50+
v_jsonb_output := v_jsonb_output || (jsonb_build_object(
51+
'tg_name', %L,
52+
'new', case when tg_op is distinct from 'DELETE' then jsonb_build_object(
53+
%s
54+
) else null end,
55+
'old', case when tg_op is distinct from 'INSERT' then jsonb_build_object(
56+
%s
57+
) else null end
58+
) || v_base_payload || (%s));
59+
end if;
60+
$sql$,
61+
coalesce(nullif(subscription.when_clause, ''), 'true'),
62+
subscription.key,
63+
(select string_agg(format($s$%L, new.%I$s$, column_name, column_name), ', ') from unnest(subscription.column_names) as column_name),
64+
(select string_agg(format($s$%L, old.%I$s$, column_name, column_name), ', ') from unnest(subscription.column_names) as column_name),
65+
pgstream.build_payload_from_extensions(subscription.payload_extensions)
66+
), e'\n') from pgstream.subscriptions as subscription where table_name = v_table_name and schema_name = v_schema_name and operation = v_op
67+
);
68+
69+
execute format(
70+
$sql$
71+
create or replace function pgstream._publish_after_%s_on_%s ()
72+
returns trigger
73+
as $inner$
74+
declare
75+
v_jsonb_output jsonb := '[]'::jsonb;
76+
77+
v_base_payload jsonb := jsonb_build_object(
78+
'tg_op', tg_op,
79+
'tg_table_name', tg_table_name,
80+
'tg_table_schema', tg_table_schema,
81+
'timestamp', (extract(epoch from now()) * 1000)::bigint
82+
);
83+
begin
84+
%s
85+
86+
if jsonb_array_length(v_jsonb_output) > 0 then
87+
insert into pgstream.events (payload, stream_id, lsn)
88+
select elem, %L, pg_current_wal_lsn()
89+
from jsonb_array_elements(v_jsonb_output) as t(elem);
90+
end if;
91+
92+
if tg_op = 'DELETE' then
93+
return old;
94+
end if;
95+
96+
return new;
97+
end
98+
$inner$
99+
language plpgsql
100+
set search_path = ''
101+
security definer;
102+
$sql$,
103+
lower(v_op::text),
104+
v_table_name,
105+
v_if_blocks,
106+
(select distinct stream_id from pgstream.subscriptions where table_name = v_table_name and schema_name = v_schema_name and operation = v_op limit 1)
107+
);
108+
109+
execute format(
110+
$sql$
111+
create constraint trigger pgstream_%s
112+
after %s on %I.%I
113+
deferrable initially deferred
114+
for each row
115+
%s
116+
execute procedure pgstream._publish_after_%s_on_%s()
117+
$sql$,
118+
lower(v_op::text),
119+
lower(v_op::text),
120+
v_schema_name,
121+
v_table_name,
122+
case when v_when_clause is not null and length(v_when_clause) > 0
123+
then 'when ((' || v_when_clause || '))'
124+
else ''
125+
end,
126+
lower(v_op::text),
127+
v_table_name
128+
);
129+
end if;
130+
end loop;
131+
132+
if tg_op = 'DELETE' then
133+
return old;
134+
end if;
135+
136+
return new;
137+
end
138+
$_$;
139+

0 commit comments

Comments
 (0)