Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions lib/lapin/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -358,15 +358,15 @@ defmodule Lapin.Connection do

with configuration <- Keyword.merge(@connection_default_params, configuration),
{:ok, connection} <- AMQP.Connection.open(configuration),
_ref = Process.monitor(connection.pid),
{:ok, config_channel} <- AMQP.Channel.open(connection),
{:ok, exchanges} <- declare_exchanges(configuration, config_channel),
{:ok, queues} <- declare_queues(configuration, config_channel),
:ok <- bind_exchanges(exchanges, config_channel),
:ok <- bind_queues(queues, config_channel),
{:ok, producers} <- create_producers(configuration, connection),
{:ok, consumers} <- create_consumers(configuration, connection),
:ok <- AMQP.Channel.close(config_channel) do
:ok <- AMQP.Channel.close(config_channel),
:ok <- monitor_processes(connection, producers, consumers) do
{
:next_state,
:connected,
Expand Down Expand Up @@ -443,6 +443,16 @@ defmodule Lapin.Connection do
{:error, reason} ->
Logger.debug("Failed rejecting message #{delivery_tag}: #{inspect(reason)}")
end
catch
:exit, reason ->
case Consumer.reject_message(consumer, delivery_tag, not redelivered) do
:ok ->
Logger.error("Rejected message #{delivery_tag} due to process exit: #{inspect(reason)}")
:ok

{:error, reject_error} ->
Logger.debug("Failed rejecting message #{delivery_tag}: #{inspect(reject_error)}")

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Logger.debug("Failed rejecting message #{delivery_tag}: #{inspect(reject_error)}")
Logger.error("Failed rejecting message #{delivery_tag}: #{inspect(reject_error)}")

And we should def have it in Sentry 🤔 Maybe we should let it burn in this case?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we were to let it burn then this spawn here would bite us again

end
end

defp consume_ack(true = _consumer_ack, consumer, delivery_tag) do
Expand Down Expand Up @@ -612,4 +622,18 @@ defmodule Lapin.Connection do
{:error, :missing_params, Enum.reject(params, &Keyword.has_key?(configuration, &1))}
end
end

defp monitor_processes(connection, producers, consumers) do
Process.monitor(connection.pid)

Enum.each(producers, fn producer ->
Process.monitor(producer.channel.pid)
end)

Enum.each(consumers, fn consumer ->
Process.monitor(consumer.channel.pid)
end)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a single :DOWN handler in this module and it stops and reconnects. The entire connection. This means any consumer or producer dying will take down all the other ones 😄 IMO we should check which process died and only restart this one in case it's one of the channels.


:ok
end
end