Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
### Added
- Add the option to enable keepalive for scylladb connections, using the environment variable
`DOCKER_VERNEMQ_ASTARTE_VMQ_PLUGIN__CASSANDRA__ENABLE_KEEPALIVE`. Defaults to `true`
### Fixed
- Corner case in device deletion acknowledgment that could lead to deletion stalling permanently.

## [1.2.1-rc.0] 2025-08-22
### Changed
Expand Down
14 changes: 9 additions & 5 deletions lib/astarte_vmq_plugin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@
mountpoint = ~c""
subscriber_id = {mountpoint, client_id}

case @vernemq_api.disconnect_by_subscriber_id(subscriber_id, opts) do

Check warning on line 104 in lib/astarte_vmq_plugin.ex

View workflow job for this annotation

GitHub Actions / Build and Test (rabbitmq:3.12.0-management, cassandra:3.11.15)

MockVerneMQ.API.disconnect_by_subscriber_id/2 is undefined (module MockVerneMQ.API is not available or is yet to be defined)

Check warning on line 104 in lib/astarte_vmq_plugin.ex

View workflow job for this annotation

GitHub Actions / Build and Test (rabbitmq:3.12.0-management, scylladb/scylla:5.2.2)

MockVerneMQ.API.disconnect_by_subscriber_id/2 is undefined (module MockVerneMQ.API is not available or is yet to be defined)
:ok ->
:ok

Expand Down Expand Up @@ -182,12 +182,16 @@
end
end

@spec ack_device_deletion(String.t(), Astarte.Core.Device.encoded_device_id()) ::
:ok | {:error, term()}
def ack_device_deletion(realm_name, encoded_device_id) do
timestamp = now_us_x10_timestamp()
publish_internal_message(realm_name, encoded_device_id, "/f", "", timestamp)
{:ok, decoded_device_id} = Device.decode_device_id(encoded_device_id)
{:ok, _} = Queries.ack_device_deletion(realm_name, decoded_device_id)
:ok
with {:ok, decoded_device_id} <- Device.decode_device_id(encoded_device_id),
{:ok, _} <- Queries.ack_device_deletion(realm_name, decoded_device_id) do
# Only send /f message after successful database write
timestamp = now_us_x10_timestamp()
publish_internal_message(realm_name, encoded_device_id, "/f", "", timestamp)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not related to this pr in particular, but since we're optimizing the process already:

I may be missing something here with how the deletion mechanism works, but if we're already replying with :ok if we've successfully acked the deletion, why do we need to also publish an internal message? can't dup just infer the deletion status from the vmq plugin response? that would make only one ack (dup "start" ack) relevant

Copy link
Copy Markdown
Collaborator Author

@davidebriani davidebriani Nov 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are raising a valid point.

The :ok reply from VerneMQ already confirms that the device was disconnected and the deletion was acknowledged, so DUP could rely on that instead of waiting for the AMQP message.
I think the extra message was originally kept for async consistency and to maintain the same event-driven pattern we used when all interactions went through the broker: I assume this way it's easier for DUP to realize when it has finished consuming additional messages that might have been in queue when progressing the device deletion.

In theory we can simplify the flow and drop the internal AMQP publish, relying solely on the GenServer reply.
My suggestion is to evaluate whether we need durability and replayability, e.g. if DUP crashes while doing the GenServer call to VerneMQ, and possibly how we wish to revisit the whole device deletion transaction.
Let's bring the conversation outside of this PR.

:ok
end
end

defp setup_heartbeat_timer(realm, device_id, session_pid) do
Expand All @@ -199,7 +203,7 @@
end

defp randomize_interval(interval, tolerance) do
multiplier = 1 + (tolerance * 2 * :random.uniform() - tolerance)

Check warning on line 206 in lib/astarte_vmq_plugin.ex

View workflow job for this annotation

GitHub Actions / Check Dialyzer

:random.uniform/0 is deprecated. Use the 'rand' module instead

Check warning on line 206 in lib/astarte_vmq_plugin.ex

View workflow job for this annotation

GitHub Actions / Check Dialyzer

:random.uniform/0 is deprecated. Use the 'rand' module instead

Check warning on line 206 in lib/astarte_vmq_plugin.ex

View workflow job for this annotation

GitHub Actions / Build and Test (rabbitmq:3.12.0-management, cassandra:3.11.15)

:random.uniform/0 is deprecated. Use the 'rand' module instead

Check warning on line 206 in lib/astarte_vmq_plugin.ex

View workflow job for this annotation

GitHub Actions / Build and Test (rabbitmq:3.12.0-management, scylladb/scylla:5.2.2)

:random.uniform/0 is deprecated. Use the 'rand' module instead

(interval * multiplier)
|> Float.round()
Expand Down
19 changes: 15 additions & 4 deletions lib/astarte_vmq_plugin/rpc/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,23 @@ defmodule Astarte.VMQ.Plugin.RPC.Server do
@impl GenServer
def handle_call({:delete, %{realm_name: realm, device_id: device}}, _from, state) do
client_id = "#{realm}/#{device}"
# Either the client has been deleted or it is :not_found,
# which means that there is no session anyway.

# Plugin.disconnect_client/2 return :ok or {:error, :not_found}, which
# means that there is no session anyway.
Plugin.disconnect_client(client_id, true)
Plugin.ack_device_deletion(realm, device)

{:reply, :ok, state}
case Plugin.ack_device_deletion(realm, device) do
:ok ->
{:reply, :ok, state}

{:error, reason} ->
Logger.error(
"Failed to acknowledge device deletion for #{realm}/#{device}: #{inspect(reason)}",
tag: "vmq_ack_deletion_failed"
)

{:reply, {:error, reason}, state}
end
end

@impl GenServer
Expand Down
Loading