Skip to content

Commit 648b976

Browse files
committed
Use wal2json non-row messages to detect that wal has moved on
1 parent a5fe4b4 commit 648b976

File tree

9 files changed

+200
-33
lines changed

9 files changed

+200
-33
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,14 @@
1+
0.75.0 (2026-04-07)
2+
-------------------
3+
- `pipelinewise-tap-postgres` from `2.1.0` to `2.2.0`
4+
- Use wal2json non-row messages to detect that wal has moved on.
5+
16
0.74.4 (2026-03-30)
27
-------------------
38
- Better handling of CSV bulk upload to Snowflake during Full and Partial sync
49

510
0.74.3 (2026-02-26)
11+
-------------------
612
- `pipelinewise-target-snowflake` from `2.5.1` to `2.5.2`
713
- Support creating new Iceberg tables for pure Singer replications
814

dev-project/entrypoint.sh

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,32 @@
22

33
set -e
44

5+
# Retry wrapper for apt-get to handle transient mirror errors
6+
apt_retry() {
7+
local max_attempts=3
8+
local attempt=1
9+
while [ $attempt -le $max_attempts ]; do
10+
if "$@"; then
11+
return 0
12+
fi
13+
echo "apt command failed (attempt $attempt/$max_attempts), retrying in 5s..."
14+
attempt=$((attempt + 1))
15+
sleep 5
16+
apt-get update
17+
done
18+
echo "apt command failed after $max_attempts attempts"
19+
return 1
20+
}
21+
522
apt-get update
6-
apt-get install -y software-properties-common python3-apt apt-utils
23+
apt_retry apt-get install -y software-properties-common python3-apt apt-utils
724

825
add-apt-repository ppa:deadsnakes/ppa
926
apt-get update
1027

1128
echo 'debconf debconf/frontend select Noninteractive' | debconf-set-selections
1229

13-
apt-get install -y --no-install-recommends \
30+
apt_retry apt-get install -y --no-install-recommends \
1431
wget \
1532
gnupg \
1633
git \

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
setup(name='pipelinewise',
99
python_requires='==3.10.*',
10-
version='0.74.4',
10+
version='0.75.0',
1111
description='PipelineWise',
1212
long_description=LONG_DESCRIPTION,
1313
long_description_content_type='text/markdown',

singer-connectors/tap-postgres/CHANGELOG.md

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
1+
2.2.0 (2026-04-07)
2+
-------------------
3+
**Changes**
4+
- LOG_BASED: Use wal2json non-row messages to detect that wal has moved on.
5+
16
2.1.0 (2023-03-30)
27
-------------------
38
**Changes**
4-
- INCREMENTAL: An optional config `limit` to be appended to incremental queries to limit their runtime.
9+
- INCREMENTAL: An optional config `limit` to be appended to incremental queries to limit their runtime.
510

611
**Fixes**
7-
- INCREMENTAL: `ORDER BY` added back to query in case replication key value is None.
12+
- INCREMENTAL: `ORDER BY` added back to query in case replication key value is None.
813

914
2.0.0 (2022-11-02)
1015
-------------------

singer-connectors/tap-postgres/setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
long_description = f.read()
77

88
setup(name='pipelinewise-tap-postgres',
9-
version='2.1.0',
9+
version='2.2.0',
1010
description='Singer.io tap for extracting data from PostgresSQL - PipelineWise compatible',
1111
long_description=long_description,
1212
long_description_content_type='text/markdown',

singer-connectors/tap-postgres/tap_postgres/sync_strategies/logical_replication.py

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -385,6 +385,23 @@ def consume_message(streams, state, msg, time_extracted, conn_info):
385385

386386
lsn = msg.data_start
387387

388+
action = payload.get('action')
389+
# Action Types:
390+
# I = Insert
391+
# U = Update
392+
# D = Delete
393+
# B = Begin Transaction
394+
# C = Commit Transaction
395+
# M = Message
396+
# T = Truncate
397+
398+
# Advance the slot LSN for non-row actions without doing any processing
399+
# This avoids the slot growing when the source has very busy tables that are NOT selected for replication
400+
if action not in {'I', 'U', 'D'}:
401+
LOGGER.debug('Skipping non-row wal2json message: action=%s, lsn=%s', action,
402+
int_to_lsn(lsn) if isinstance(lsn, int) else lsn)
403+
return state
404+
388405
streams_lookup = {s['tap_stream_id']: s for s in streams}
389406

390407
tap_stream_id = post_db.compute_tap_stream_id(payload['schema'], payload['table'])
@@ -415,19 +432,6 @@ def consume_message(streams, state, msg, time_extracted, conn_info):
415432
# ]
416433
# }
417434

418-
# Action Types:
419-
# I = Insert
420-
# U = Update
421-
# D = Delete
422-
# B = Begin Transaction
423-
# C = Commit Transaction
424-
# M = Message
425-
# T = Truncate
426-
action = payload['action']
427-
428-
if action not in {'I', 'U', 'D'}:
429-
raise UnsupportedPayloadKindError(f"unrecognized replication operation: {action}")
430-
431435
# Get the additional fields in payload that are not in schema properties:
432436
# only inserts and updates have the list of columns that can be used to detect any different in columns
433437
diff = set()
@@ -613,7 +617,7 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file):
613617
status_interval=poll_interval,
614618
options={
615619
'format-version': 2,
616-
'include-transaction': False,
620+
'include-transaction': True,
617621
'include-timestamp': True,
618622
'include-types': False,
619623
'actions': 'insert,update,delete',

singer-connectors/tap-postgres/tests/integration/test_logical_replication.py

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,3 +206,100 @@ def test_logical_replication(self):
206206
})
207207
self.assertEqual(messages[4]['type'], 'STATE')
208208
self.assertDictEqual(state, messages[4]['value'])
209+
210+
211+
class TestUnselectedTableSlotAdvancement(unittest.TestCase):
212+
"""Test that WAL slot LSN advances even when only non-selected tables have activity.
213+
214+
This verifies the include-transaction: true fix — B/C markers from transactions
215+
on unselected tables cause the slot to advance, preventing unbounded slot growth.
216+
"""
217+
selected_table = None
218+
unselected_table = None
219+
maxDiff = None
220+
221+
@classmethod
222+
def setUpClass(cls) -> None:
223+
cls.selected_table = 'selected_table'
224+
cls.unselected_table = 'unselected_table'
225+
226+
selected_spec = {
227+
"columns": [
228+
{"name": "id", "type": "serial", "primary_key": True},
229+
{"name": "val", "type": "character varying"},
230+
],
231+
"name": cls.selected_table,
232+
}
233+
unselected_spec = {
234+
"columns": [
235+
{"name": "id", "type": "serial", "primary_key": True},
236+
{"name": "val", "type": "character varying"},
237+
],
238+
"name": cls.unselected_table,
239+
}
240+
241+
ensure_test_table(selected_spec)
242+
ensure_test_table(unselected_spec)
243+
create_replication_slot()
244+
245+
cls.config = get_test_connection_config()
246+
tap_postgres.dump_catalog = lambda catalog: True
247+
248+
@classmethod
249+
def tearDownClass(cls) -> None:
250+
drop_replication_slot()
251+
drop_table(cls.selected_table)
252+
drop_table(cls.unselected_table)
253+
254+
def test_slot_advances_with_only_unselected_table_activity(self):
255+
"""Slot LSN must advance when only the unselected table receives writes."""
256+
257+
# Discover streams, select only `selected_table` for LOG_BASED
258+
streams = tap_postgres.do_discovery(self.config)
259+
selected_stream = [s for s in streams if s['tap_stream_id'] == f'public-{self.selected_table}'][0]
260+
selected_stream = set_replication_method_for_stream(selected_stream, 'LOG_BASED')
261+
262+
# Insert a row into the selected table so initial sync has something to process
263+
conn = get_test_connection()
264+
try:
265+
with conn.cursor() as cur:
266+
insert_record(cur, self.selected_table, {'val': 'seed'})
267+
finally:
268+
conn.close()
269+
270+
# Initial sync to establish bookmarks
271+
state = {}
272+
my_stdout = io.StringIO()
273+
with contextlib.redirect_stdout(my_stdout):
274+
state = tap_postgres.do_sync(self.config, {'streams': [selected_stream]}, 'LOG_BASED', state, None)
275+
276+
# Capture LSN after initial sync
277+
initial_lsn = state['bookmarks'][f'public-{self.selected_table}']['lsn']
278+
self.assertIsNotNone(initial_lsn, "Initial sync should set an LSN bookmark")
279+
280+
# Now insert rows ONLY into the unselected table
281+
conn = get_test_connection()
282+
try:
283+
with conn.cursor() as cur:
284+
for i in range(5):
285+
insert_record(cur, self.unselected_table, {'val': f'noise_{i}'})
286+
finally:
287+
conn.close()
288+
289+
# Run sync again — no rows from selected table, but B/C markers should advance the slot
290+
my_stdout.seek(0)
291+
my_stdout.truncate()
292+
with contextlib.redirect_stdout(my_stdout):
293+
state = tap_postgres.do_sync(self.config, {'streams': [selected_stream]}, 'LOG_BASED', state, None)
294+
295+
# Assert that the LSN bookmark has advanced past the unselected-table activity
296+
new_lsn = state['bookmarks'][f'public-{self.selected_table}']['lsn']
297+
self.assertGreater(new_lsn, initial_lsn,
298+
"LSN bookmark should advance when unselected tables have WAL activity "
299+
"(B/C markers from include-transaction: true)")
300+
301+
# Verify no RECORD messages were emitted (only the unselected table had activity)
302+
messages = [json.loads(msg) for msg in my_stdout.getvalue().splitlines()]
303+
record_messages = [m for m in messages if m['type'] == 'RECORD']
304+
self.assertEqual(len(record_messages), 0,
305+
"No RECORD messages should be emitted for unselected table activity")

singer-connectors/tap-postgres/tests/unit/test_logical_replication.py

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
from dateutil.tz import tzoffset
1212

1313
from tap_postgres.sync_strategies import logical_replication
14-
from tap_postgres.sync_strategies.logical_replication import UnsupportedPayloadKindError
1514

1615

1716
class PostgresCurReplicationSlotMock:
@@ -177,16 +176,44 @@ def test_consume_with_message_stream_in_payload_is_not_selected_expect_same_stat
177176

178177
self.assertDictEqual({}, output)
179178

180-
def test_consume_with_payload_kind_is_not_supported_expect_exception(self):
181-
with self.assertRaises(UnsupportedPayloadKindError):
182-
logical_replication.consume_message(
183-
[{'tap_stream_id': 'myschema-mytable'}],
184-
{},
185-
self.WalMessage(payload='{"action":"truncate", "schema": "myschema", "table": "mytable"}',
186-
data_start='some lsn'),
187-
None,
188-
{}
189-
)
179+
def test_consume_with_truncate_action_returns_state_unchanged(self):
180+
"""Truncate (T) actions with schema/table keys are silently skipped"""
181+
output = logical_replication.consume_message(
182+
[{'tap_stream_id': 'myschema-mytable'}],
183+
{},
184+
self.WalMessage(payload='{"action":"T", "schema": "myschema", "table": "mytable"}',
185+
data_start='some lsn'),
186+
None,
187+
{}
188+
)
189+
190+
self.assertDictEqual({}, output)
191+
192+
def test_consume_with_begin_action_without_schema_table_returns_state_unchanged(self):
193+
"""Begin (B) messages from wal2json don't include schema/table keys — must not KeyError"""
194+
output = logical_replication.consume_message(
195+
[{'tap_stream_id': 'myschema-mytable'}],
196+
{},
197+
self.WalMessage(payload='{"action":"B","xid":12345,"timestamp":"2026-04-06 12:00:00.000000+00"}',
198+
data_start='some lsn'),
199+
None,
200+
{}
201+
)
202+
203+
self.assertDictEqual({}, output)
204+
205+
def test_consume_with_commit_action_without_schema_table_returns_state_unchanged(self):
206+
"""Commit (C) messages from wal2json don't include schema/table keys — must not KeyError"""
207+
output = logical_replication.consume_message(
208+
[{'tap_stream_id': 'myschema-mytable'}],
209+
{},
210+
self.WalMessage(payload='{"action":"C","xid":12345,"timestamp":"2026-04-06 12:00:00.000000+00"}',
211+
data_start='some lsn'),
212+
None,
213+
{}
214+
)
215+
216+
self.assertDictEqual({}, output)
190217

191218
@patch('tap_postgres.logical_replication.singer.write_message')
192219
@patch('tap_postgres.logical_replication.sync_common.send_schema_message')
@@ -1104,7 +1131,7 @@ def test_sync_tables_if_poll_duration_greater_than_logical_poll_total_seconds(se
11041131
status_interval=10,
11051132
options={
11061133
'format-version': 2,
1107-
'include-transaction': False,
1134+
'include-transaction': True,
11081135
'include-timestamp': True,
11091136
'include-types': False,
11101137
'actions': 'insert,update,delete',
@@ -1145,7 +1172,7 @@ def test_sync_tables_if_reached_max_run_seconds(self,
11451172
status_interval=10,
11461173
options={
11471174
'format-version': 2,
1148-
'include-transaction': False,
1175+
'include-transaction': True,
11491176
'include-timestamp': True,
11501177
'include-types': False,
11511178
'actions': 'insert,update,delete',

tests/db/tap_postgres_data_logical.sql

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,13 @@ CREATE TABLE logical1.logical1_table2(
1919
PRIMARY KEY (cid)
2020
);
2121

22+
CREATE TABLE logical1.logical1_not_selected(
23+
cid serial NOT NULL,
24+
cvarchar varchar,
25+
PRIMARY KEY (cid)
26+
);
27+
28+
2229
CREATE TABLE logical1.logical1_edgydata (LIKE public.edgydata INCLUDING INDEXES);
2330

2431

@@ -106,6 +113,10 @@ INSERT INTO logical2.logical2_table1 (cvarchar) VALUES ('inserted row');
106113

107114
UPDATE logical2.logical2_table1 SET cvarchar = 'updated row';
108115

116+
INSERT INTO logical1.logical1_not_selected (cvarchar) VALUES ('inserted row');
117+
INSERT INTO logical1.logical1_not_selected (cvarchar) VALUES ('inserted row');
118+
INSERT INTO logical1.logical1_not_selected (cvarchar) VALUES ('inserted row');
119+
109120
INSERT INTO logical3.logical3_table1 (cvarchar) VALUES ('inserted row');
110121
INSERT INTO logical3.logical3_table1 (cvarchar) VALUES ('inserted row');
111122
INSERT INTO logical3.logical3_table1 (cvarchar) VALUES ('inserted row');

0 commit comments

Comments
 (0)