@@ -176,12 +176,38 @@ def test_consume_with_message_stream_in_payload_is_not_selected_expect_same_stat
176176
177177 self .assertDictEqual ({}, output )
178178
179- def test_consume_with_non_row_action_returns_state_unchanged (self ):
180- """B/C/T/M actions are silently skipped to allow WAL slot LSN to advance """
179+ def test_consume_with_truncate_action_returns_state_unchanged (self ):
180+ """Truncate (T) actions with schema/table keys are silently skipped """
181181 output = logical_replication .consume_message (
182182 [{'tap_stream_id' : 'myschema-mytable' }],
183183 {},
184- self .WalMessage (payload = '{"action":"truncate", "schema": "myschema", "table": "mytable"}' ,
184+ self .WalMessage (payload = '{"action":"T", "schema": "myschema", "table": "mytable"}' ,
185+ data_start = 'some lsn' ),
186+ None ,
187+ {}
188+ )
189+
190+ self .assertDictEqual ({}, output )
191+
192+ def test_consume_with_begin_action_without_schema_table_returns_state_unchanged (self ):
193+ """Begin (B) messages from wal2json don't include schema/table keys — must not KeyError"""
194+ output = logical_replication .consume_message (
195+ [{'tap_stream_id' : 'myschema-mytable' }],
196+ {},
197+ self .WalMessage (payload = '{"action":"B","xid":12345,"timestamp":"2026-04-06 12:00:00.000000+00"}' ,
198+ data_start = 'some lsn' ),
199+ None ,
200+ {}
201+ )
202+
203+ self .assertDictEqual ({}, output )
204+
205+ def test_consume_with_commit_action_without_schema_table_returns_state_unchanged (self ):
206+ """Commit (C) messages from wal2json don't include schema/table keys — must not KeyError"""
207+ output = logical_replication .consume_message (
208+ [{'tap_stream_id' : 'myschema-mytable' }],
209+ {},
210+ self .WalMessage (payload = '{"action":"C","xid":12345,"timestamp":"2026-04-06 12:00:00.000000+00"}' ,
185211 data_start = 'some lsn' ),
186212 None ,
187213 {}
0 commit comments