Skip to content

Commit 8aec747

Browse files
committed
🐛 Don't trust the LSN from pg_logical_emit_message/3
We have a Postgres database that returns wildly different numbers for pg_logical_emit_message/3 and pg_current_wal_lsn/0: ``` select pg_logical_emit_message(true, 'acco-ignore-message', '') B0234/7C794C04 select pg_current_wal_lsn(); 0/471892C8 ``` We're not sure why this happens (the message coming through the slot has an LSN that's close to pg_current_wal_lsn). We suspect the way the db was created (replica/standby/copy?). To be safe, we will not trust the LSN that is returned by this function.
1 parent e33cbc5 commit 8aec747

File tree

2 files changed

+36
-23
lines changed

2 files changed

+36
-23
lines changed

lib/sequin/databases_runtime/table_reader.ex

+8-3
Original file line numberDiff line numberDiff line change
@@ -103,12 +103,17 @@ defmodule Sequin.DatabasesRuntime.TableReader do
103103
]),
104104
{:ok, res} <- fun.(conn),
105105
Logger.debug("[TableReader] Emitting high watermark for batch #{current_batch_id}"),
106-
{:ok, %{rows: [[lsn]]}} <-
106+
{:ok, _} <-
107107
Postgres.query(conn, @emit_logical_message_sql, [
108108
Constants.backfill_batch_high_watermark(),
109109
payload
110-
]) do
111-
{:ok, res, lsn}
110+
]),
111+
# Note: We can't trust the LSN returned by pg_logical_emit_message function. For reasons we
112+
# don't understand yet, we can have a situation where the LSN returned by this function is
113+
# way different from `pg_current_wal_lsn()` / the LSNs coming from the replication slot.
114+
{:ok, %{rows: [[appx_lsn]]}} <-
115+
Postgres.query(conn, "select pg_current_wal_lsn()") do
116+
{:ok, res, appx_lsn}
112117
end
113118
end
114119

lib/sequin/databases_runtime/table_reader_server.ex

+28-20
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,9 @@ defmodule Sequin.DatabasesRuntime.TableReaderServer do
126126
field :batch, list() | nil
127127
field :batch_size, non_neg_integer()
128128
field :batch_id, TableReader.batch_id() | nil
129-
field :batch_lsn, integer()
129+
# ~LSN around the time the batch was fetched. Approximate, as it doesn't correspond to
130+
# the watermark messages emitted by this process (see TableReader.with_watermark/5 for explanation.)
131+
field :batch_appx_lsn, non_neg_integer() | nil
130132
field :fetch_slot_lsn, (PostgresDatabase.t(), String.t() -> {:ok, term()} | {:error, term()})
131133
field :fetch_batch, (pid(), PostgresDatabaseTable.t(), map(), Keyword.t() -> {:ok, term()} | {:error, term()})
132134
field :batch_check_count, integer(), default: 0
@@ -234,14 +236,14 @@ defmodule Sequin.DatabasesRuntime.TableReaderServer do
234236
time_ms = max(System.monotonic_time(:millisecond) - start_time, 1)
235237

236238
case res do
237-
{:ok, %{rows: [], next_cursor: nil}, _lsn} ->
239+
{:ok, %{rows: [], next_cursor: nil}, _appx_lsn} ->
238240
Logger.info("[TableReaderServer] Batch returned no records. Table pagination complete.")
239241
Consumers.table_reader_finished(state.consumer.id)
240242
TableReader.delete_cursor(state.consumer.active_backfill.id)
241243

242244
{:stop, :normal}
243245

244-
{:ok, %{rows: rows, next_cursor: next_cursor}, lsn} ->
246+
{:ok, %{rows: rows, next_cursor: next_cursor}, appx_lsn} ->
245247
Logger.debug("[TableReaderServer] Batch returned #{length(rows)} records")
246248

247249
# Record successful timing
@@ -254,7 +256,7 @@ defmodule Sequin.DatabasesRuntime.TableReaderServer do
254256

255257
batch =
256258
rows
257-
|> messages_from_rows(lsn, state)
259+
|> messages_from_rows(state)
258260
|> filter_messages(state)
259261

260262
if batch == [] do
@@ -268,7 +270,7 @@ defmodule Sequin.DatabasesRuntime.TableReaderServer do
268270
| batch: batch,
269271
batch_id: batch_id,
270272
next_cursor: next_cursor,
271-
batch_lsn: lsn
273+
batch_appx_lsn: appx_lsn
272274
}
273275

274276
{:next_state, :await_flush, state}
@@ -332,10 +334,10 @@ defmodule Sequin.DatabasesRuntime.TableReaderServer do
332334
state_name == {:paused, :max_pending_messages} and state.count_pending_messages < state.max_pending_messages ->
333335
{:next_state, :fetch_batch, state, actions}
334336

335-
state_name == :await_flush and current_slot_lsn > state.batch_lsn ->
337+
state_name == :await_flush and current_slot_lsn > state.batch_appx_lsn ->
336338
Logger.warning(
337339
"[TableReaderServer] Detected stale batch #{state.batch_id}. " <>
338-
"Batch LSN #{state.batch_lsn} is behind slot LSN #{current_slot_lsn}. Retrying."
340+
"Batch LSN #{state.batch_appx_lsn} is behind slot LSN #{current_slot_lsn}. Retrying."
339341
)
340342

341343
state = %{state | batch: nil, batch_id: nil, next_cursor: nil}
@@ -357,7 +359,7 @@ defmodule Sequin.DatabasesRuntime.TableReaderServer do
357359

358360
def handle_event(
359361
{:call, from},
360-
{:flush_batch, %{batch_id: batch_id} = batch_info},
362+
{:flush_batch, %{batch_id: batch_id, commit_lsn: commit_lsn} = batch_info},
361363
_state_name,
362364
%State{batch_id: batch_id, batch: batch} = state
363365
)
@@ -375,6 +377,18 @@ defmodule Sequin.DatabasesRuntime.TableReaderServer do
375377
{:next_state, {:paused, :max_pending_messages}, state}
376378
end
377379
else
380+
batch =
381+
batch
382+
|> Stream.with_index()
383+
|> Stream.map(fn
384+
{%ConsumerRecord{} = record, idx} ->
385+
%{record | commit_lsn: commit_lsn, commit_idx: idx}
386+
387+
{%ConsumerEvent{} = event, idx} ->
388+
%{event | commit_lsn: commit_lsn, commit_idx: idx}
389+
end)
390+
|> Enum.to_list()
391+
378392
case push_batch_with_retry(state, batch) do
379393
:ok ->
380394
# Clear the batch from memory
@@ -513,20 +527,17 @@ defmodule Sequin.DatabasesRuntime.TableReaderServer do
513527
%{db_table | sort_column_attnum: sort_column_attnum(state.consumer)}
514528
end
515529

516-
defp messages_from_rows(rows, commit_lsn, %State{} = state) when record_messages?(state) do
530+
defp messages_from_rows(rows, %State{} = state) when record_messages?(state) do
517531
table = table(state)
518532
consumer = state.consumer
519533

520-
rows
521-
|> Enum.with_index()
522-
|> Enum.map(fn {row, idx} ->
534+
Enum.map(rows, fn row ->
523535
data = build_record_data(table, consumer, row)
524536
payload_size_bytes = :erlang.external_size(data)
525537

538+
# commit_lsn and commit_idx are added later
526539
%ConsumerRecord{
527540
consumer_id: consumer.id,
528-
commit_lsn: commit_lsn,
529-
commit_idx: idx,
530541
table_oid: table.oid,
531542
record_pks: record_pks(table, row),
532543
group_id: generate_group_id(consumer, table, row),
@@ -537,20 +548,17 @@ defmodule Sequin.DatabasesRuntime.TableReaderServer do
537548
end)
538549
end
539550

540-
defp messages_from_rows(rows, commit_lsn, %State{} = state) when event_messages?(state) do
551+
defp messages_from_rows(rows, %State{} = state) when event_messages?(state) do
541552
table = table(state)
542553
consumer = state.consumer
543554

544-
rows
545-
|> Enum.with_index()
546-
|> Enum.map(fn {row, idx} ->
555+
Enum.map(rows, fn row ->
547556
data = build_event_data(table, consumer, row)
548557
payload_size_bytes = :erlang.external_size(data)
549558

559+
# commit_lsn and commit_idx are added later
550560
%ConsumerEvent{
551561
consumer_id: consumer.id,
552-
commit_lsn: commit_lsn,
553-
commit_idx: idx,
554562
record_pks: record_pks(table, row),
555563
group_id: generate_group_id(consumer, table, row),
556564
table_oid: table.oid,

0 commit comments

Comments
 (0)