Skip to content

Commit 3ebb980

Browse files
committed
🐛 Carve out for pre Postgres 14 dbs
1 parent 2df2f9f commit 3ebb980

File tree

2 files changed

+18
-4
lines changed

2 files changed

+18
-4
lines changed

lib/sequin/runtime/slot_message_handler.ex

+1-1
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ defmodule Sequin.Runtime.SlotMessageHandler do
187187
retry_deliver_interval = Keyword.get(opts, :setting_retry_deliver_interval, :timer.seconds(1))
188188

189189
Logger.metadata(
190-
replication_slot_id: replication_slot_id,
190+
replication_id: replication_slot_id,
191191
processor_idx: processor_idx
192192
)
193193

lib/sequin/runtime/slot_processor_server.ex

+17-3
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@ defmodule Sequin.Runtime.SlotProcessorServer do
4646
@config_schema Application.compile_env(:sequin, [Sequin.Repo, :config_schema_prefix])
4747
@stream_schema Application.compile_env(:sequin, [Sequin.Repo, :stream_schema_prefix])
4848

49+
@slots_ids_with_old_postgres [
50+
"59d70fc1-e6a2-4c0e-9f4d-c5ced151cec1",
51+
"dcfba45f-d503-4fef-bb11-9221b9efa70a"
52+
]
53+
4954
def max_accumulated_bytes do
5055
Application.get_env(:sequin, :slot_processor_max_accumulated_bytes) || @max_accumulated_bytes
5156
end
@@ -265,7 +270,7 @@ defmodule Sequin.Runtime.SlotProcessorServer do
265270
{:ok, low_watermark_wal_cursor} = Sequin.Replication.low_watermark_wal_cursor(state.id)
266271

267272
query =
268-
if state.id in ["59d70fc1-e6a2-4c0e-9f4d-c5ced151cec1", "dcfba45f-d503-4fef-bb11-9221b9efa70a"] do
273+
if state.id in @slots_ids_with_old_postgres do
269274
"START_REPLICATION SLOT #{state.slot_name} LOGICAL 0/0 (proto_version '1', publication_names '#{state.publication}')"
270275
else
271276
"START_REPLICATION SLOT #{state.slot_name} LOGICAL 0/0 (proto_version '1', publication_names '#{state.publication}', messages 'true')"
@@ -418,7 +423,7 @@ defmodule Sequin.Runtime.SlotProcessorServer do
418423
def handle_data(<<?k, _wal_end::64, _clock::64, 0>>, %State{} = state) do
419424
# Because these are <14 Postgres databases, they will not receive heartbeat messages
420425
# temporarily mark them as healthy if we receive a keepalive message
421-
if state.id in ["59d70fc1-e6a2-4c0e-9f4d-c5ced151cec1", "dcfba45f-d503-4fef-bb11-9221b9efa70a"] do
426+
if state.id in @slots_ids_with_old_postgres do
422427
Health.put_event(
423428
state.replication_slot,
424429
%Health.Event{slug: :replication_heartbeat_received, status: :success}
@@ -563,7 +568,7 @@ defmodule Sequin.Runtime.SlotProcessorServer do
563568
end
564569

565570
@impl ReplicationConnection
566-
def handle_info(:emit_heartbeat, %State{id: "dcfba45f-d503-4fef-bb11-9221b9efa70a"} = state) do
571+
def handle_info(:emit_heartbeat, %State{id: id} = state) when id in @slots_ids_with_old_postgres do
567572
execute_timed(:handle_info_emit_heartbeat, fn ->
568573
# Carve out for individual cloud customer who still needs to upgrade to Postgres 14+
569574
# This heartbeat is not used for health, but rather to advance the slot even if tables are dormant.
@@ -624,6 +629,15 @@ defmodule Sequin.Runtime.SlotProcessorServer do
624629
next_state = schedule_heartbeat_verification(state)
625630

626631
cond do
632+
# Carve out for individual cloud customer who still needs to upgrade to Postgres 14+
633+
state.id in @slots_ids_with_old_postgres ->
634+
Health.put_event(
635+
state.replication_slot,
636+
%Event{slug: :replication_heartbeat_verification, status: :success}
637+
)
638+
639+
{:keep_state, next_state}
640+
627641
# No outstanding heartbeat but we have emitted one in the last 2m
628642
# This is the most likely clause we hit because we usually receive the heartbeat message
629643
# pretty quickly after emitting it

0 commit comments

Comments
 (0)