Skip to content

Commit f6bff99

Browse files
committed
✨ Improve back-pressure handling by keeping SlotProcessor up
- Add new functionality to ReplicationConnection to allow us to start timers and update state in more areas - Track the last flushed wal cursor in state - When we get back pressure error, disconnect don't crash - On reconnect, filter out messages below the last flushed wal cursor
1 parent 6011ebf commit f6bff99

File tree

3 files changed

+237
-109
lines changed

3 files changed

+237
-109
lines changed

lib/sequin/postgres/replication_connection.ex

+40-61
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,6 @@ defmodule Sequin.Postgres.ReplicationConnection do
6262
use Postgrex.ReplicationConnection
6363
6464
def start_link(opts) do
65-
# Automatically reconnect if we lose connection.
66-
extra_opts = [
67-
auto_reconnect: true
68-
]
69-
7065
Postgrex.ReplicationConnection.start_link(__MODULE__, :ok, extra_opts ++ opts)
7166
end
7267
@@ -154,7 +149,6 @@ defmodule Sequin.Postgres.ReplicationConnection do
154149
@doc false
155150
defstruct protocol: nil,
156151
state: nil,
157-
auto_reconnect: false,
158152
reconnect_backoff: 500,
159153
streaming: nil
160154

@@ -165,8 +159,8 @@ defmodule Sequin.Postgres.ReplicationConnection do
165159
@type ack :: iodata
166160
@type query :: iodata
167161
@type reason :: String.t()
168-
@type actions :: [term()]
169-
162+
@type error :: term()
163+
@type actions :: [tuple()]
170164
@typedoc """
171165
The following options configure streaming:
172166
@@ -189,23 +183,25 @@ defmodule Sequin.Postgres.ReplicationConnection do
189183

190184
@doc """
191185
Invoked after connecting.
192-
193-
This may be invoked multiple times if `:auto_reconnect` is set to true.
194186
"""
195187
@callback handle_connect(state) ::
196188
{:keep_state, state}
197189
| {:keep_state, state, actions}
198190
| {:keep_state_and_ack, ack, state}
199191
| {:query, query, state}
200192
| {:stream, query, stream_opts, state}
201-
| {:disconnect, reason}
193+
| {:disconnect, reason, state}
202194

203195
@doc """
204-
Invoked after disconnecting.
196+
Invoked if connection fails.
197+
"""
198+
@callback handle_connect_failed(reason, state) ::
199+
{:keep_state, state} | {:keep_state, state, actions} | {:stop, reason, state}
205200

206-
This is only invoked if `:auto_reconnect` is set to true.
201+
@doc """
202+
Invoked after disconnecting.
207203
"""
208-
@callback handle_disconnect(state) :: {:keep_state, state}
204+
@callback handle_disconnect(error, reason, state) :: {:keep_state, state} | {:keep_state, state, actions}
209205

210206
@doc """
211207
Callback for `:stream` outputs.
@@ -227,7 +223,7 @@ defmodule Sequin.Postgres.ReplicationConnection do
227223
| {:keep_state_and_ack, ack, state}
228224
| {:query, query, state}
229225
| {:stream, query, stream_opts, state}
230-
| {:disconnect, reason}
226+
| {:disconnect, reason, state}
231227

232228
@doc """
233229
Callback for `Kernel.send/2`.
@@ -238,27 +234,20 @@ defmodule Sequin.Postgres.ReplicationConnection do
238234
| {:keep_state_and_ack, ack, state}
239235
| {:query, query, state}
240236
| {:stream, query, stream_opts, state}
241-
| {:disconnect, reason}
237+
| {:disconnect, reason, state}
242238

243239
@doc """
244240
Callback for `call/3`.
245241
246242
Replies must be sent with `reply/2`.
247-
248-
If `auto_reconnect: false` (the default) and there is a disconnection,
249-
the process will terminate and the caller will exit even if no reply is
250-
sent. However, if `auto_reconnect` is set to true, a disconnection will
251-
keep the process alive, which means that any command that has not yet
252-
been replied to should eventually do so. One simple approach is to
253-
reply to any pending commands on `c:handle_disconnect/1`.
254243
"""
255244
@callback handle_call(term, :gen_statem.from(), state) ::
256245
{:keep_state, state}
257246
| {:keep_state, state, actions}
258247
| {:keep_state_and_ack, ack, state}
259248
| {:query, query, state}
260249
| {:stream, query, stream_opts, state}
261-
| {:disconnect, reason}
250+
| {:disconnect, reason, state}
262251

263252
@doc """
264253
Callback for `:query` outputs.
@@ -278,12 +267,11 @@ defmodule Sequin.Postgres.ReplicationConnection do
278267
| {:keep_state_and_ack, ack, state}
279268
| {:query, query, state}
280269
| {:stream, query, stream_opts, state}
281-
| {:disconnect, reason}
270+
| {:disconnect, reason, state}
282271

283272
@optional_callbacks handle_call: 3,
284273
handle_connect: 1,
285274
handle_data: 2,
286-
handle_disconnect: 1,
287275
handle_info: 2,
288276
handle_result: 2
289277

@@ -343,14 +331,6 @@ defmodule Sequin.Postgres.ReplicationConnection do
343331
344332
* `:sync_connect` - controls if the connection should be established on boot
345333
or asynchronously right after boot. Defaults to `true`.
346-
347-
* `:auto_reconnect` - automatically attempt to reconnect to the database
348-
in event of a disconnection. See the
349-
[note about async connect and auto-reconnects](#module-async-connect-and-auto-reconnects)
350-
above. Defaults to `false`, which means the process terminates.
351-
352-
* `:reconnect_backoff` - time (in ms) between reconnection attempts when
353-
`:auto_reconnect` is enabled. Defaults to `500`.
354334
"""
355335
@spec start_link(module(), term(), Keyword.t()) ::
356336
{:ok, pid} | {:error, Postgrex.Error.t() | term}
@@ -468,11 +448,9 @@ defmodule Sequin.Postgres.ReplicationConnection do
468448
&Keyword.put_new(&1, :replication, "database")
469449
)
470450

471-
{auto_reconnect, opts} = Keyword.pop(opts, :auto_reconnect, false)
472451
{reconnect_backoff, opts} = Keyword.pop(opts, :reconnect_backoff, 500)
473452

474453
state = %__MODULE__{
475-
auto_reconnect: auto_reconnect,
476454
reconnect_backoff: reconnect_backoff,
477455
state: {mod, mod_state}
478456
}
@@ -505,11 +483,7 @@ defmodule Sequin.Postgres.ReplicationConnection do
505483
maybe_handle(mod, :handle_connect, [mod_state], %{s | protocol: protocol})
506484

507485
{:error, reason} ->
508-
if s.auto_reconnect do
509-
{:keep_state, s, {{:timeout, :backoff}, s.reconnect_backoff, nil}}
510-
else
511-
{:stop, reason, s}
512-
end
486+
maybe_handle(mod, :handle_connect_failed, [reason, mod_state], s)
513487
end
514488
end
515489

@@ -533,6 +507,11 @@ defmodule Sequin.Postgres.ReplicationConnection do
533507
end)
534508
end
535509

510+
def handle_event({:timeout, :reconnect}, nil, @state, s) do
511+
Logger.info("Reconnecting to replication slot")
512+
{:keep_state, s, {:next_event, :internal, {:connect, :reconnect}}}
513+
end
514+
536515
## Helpers
537516

538517
defp handle_data([], s), do: {:keep_state, s}
@@ -604,8 +583,8 @@ defmodule Sequin.Postgres.ReplicationConnection do
604583
{:query, _query, mod_state} ->
605584
stream_in_progress(:query, mod, mod_state, from, s)
606585

607-
{:disconnect, reason} ->
608-
reconnect_or_stop(:disconnect, reason, s.protocol, s)
586+
{:disconnect, reason, mod_state} ->
587+
reconnect_or_stop(:disconnect, reason, s.protocol, %{s | state: {mod, mod_state}})
609588
end
610589
end
611590

@@ -615,32 +594,32 @@ defmodule Sequin.Postgres.ReplicationConnection do
615594
{:keep_state, %{s | state: {mod, mod_state}}}
616595
end
617596

618-
defp reconnect_or_stop(error, reason, protocol, %{auto_reconnect: false} = s) when error in [:error, :disconnect] do
597+
defp reconnect_or_stop(error, reason, protocol, s) when error in [:error, :disconnect] do
619598
%{state: {mod, mod_state}} = s
620599

621-
{:keep_state, s} =
622-
maybe_handle(mod, :handle_disconnect, [mod_state], %{s | protocol: protocol})
600+
# Exception is unused
601+
Protocol.disconnect(%RuntimeError{}, protocol)
623602

624-
{:stop, reason, s}
603+
maybe_handle(mod, :handle_disconnect, [error, reason, mod_state], %{s | protocol: nil, streaming: nil})
625604
end
626605

627-
defp reconnect_or_stop(error, reason, _protocol, %{auto_reconnect: true} = s) when error in [:error, :disconnect] do
628-
%{state: {mod, mod_state}} = s
606+
# defp reconnect_or_stop(error, reason, _protocol, %{auto_reconnect: true} = s) when error in [:error, :disconnect] do
607+
# %{state: {mod, mod_state}} = s
629608

630-
Logger.error(
631-
"#{inspect(pid_or_name())} (#{inspect(mod)}) is reconnecting due to reason: #{Exception.format(:error, reason)}"
632-
)
609+
# Logger.error(
610+
# "#{inspect(pid_or_name())} (#{inspect(mod)}) is reconnecting due to reason: #{Exception.format(:error, reason)}"
611+
# )
633612

634-
{:keep_state, s} = maybe_handle(mod, :handle_disconnect, [mod_state], s)
635-
{:keep_state, %{s | streaming: nil}, {:next_event, :internal, {:connect, :reconnect}}}
636-
end
613+
# {:keep_state, s} = maybe_handle(mod, :handle_disconnect, [mod_state], s)
614+
# {:keep_state, %{s | streaming: nil}, {:next_event, :internal, {:connect, :reconnect}}}
615+
# end
637616

638-
defp pid_or_name do
639-
case Process.info(self(), :registered_name) do
640-
{:registered_name, atom} when is_atom(atom) -> atom
641-
_ -> self()
642-
end
643-
end
617+
# defp pid_or_name do
618+
# case Process.info(self(), :registered_name) do
619+
# {:registered_name, atom} when is_atom(atom) -> atom
620+
# _ -> self()
621+
# end
622+
# end
644623

645624
defp opts, do: Process.get(__MODULE__)
646625
defp put_opts(opts), do: Process.put(__MODULE__, opts)

0 commit comments

Comments
 (0)