Skip to content

Commit 075fbf5

Browse files
committed
fix: resolve race condition in VMQ device deletion acknowledgment
Device deletion in Astarte involves a distributed coordination mechanism where the VMQ plugin must acknowledge deletion by setting vmq_ack=true in the database and sending a /f message to trigger final cleanup. The current implementation has a critical race condition that can cause device deletions to stall forever. The root cause is that the ack_device_deletion/2 function executes operations in the wrong order: 1. Sends /f message to AMQP 2. Writes vmq_ack=true to database This creates a race condition where: - /f message is published successfully - Database write fails (timeout, connection error, etc.) - DUP processes /f message and sets dup_end_ack=true - vmq_ack remains false, causing all_ack?() to return false - Device deletion stalls permanently with no retry mechanism Additionally, the RPC server always returns :ok regardless of actual operation success, masking errors from callers. The involves: 1. Reordering operations to ensure database write completes before message publication 2. Using proper error handling with early return on database failures 3. Propagating errors correctly through RPC server to enable caller retry logic This ensures atomic behavior: either both operations succeed or both fail, eliminating the race condition that caused permanent deletion stalls. Breaking change: RPC callers will now receive error responses for failed operations instead of false success indicators. However, error cases are already handled in Astarte since v1.2: https://github.com/astarte-platform/astarte/blob/35c877efeece31a66576982f9fa30c00b4b801ea/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/data_updater/impl.ex#L2340 Signed-off-by: Davide Briani <davide.briani@secomind.com>
1 parent 2abcf21 commit 075fbf5

File tree

3 files changed

+26
-9
lines changed

3 files changed

+26
-9
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
88
### Added
99
- Add the option to enable keepalive for scylladb connections, using the environment variable
1010
`DOCKER_VERNEMQ_ASTARTE_VMQ_PLUGIN__CASSANDRA__ENABLE_KEEPALIVE`. Defaults to `true`
11+
### Fixed
12+
- Corner case in device deletion acknowledgment that could lead to deletion stalling permanently.
1113

1214
## [1.2.1-rc.0] 2025-08-22
1315
### Changed

lib/astarte_vmq_plugin.ex

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -182,12 +182,16 @@ defmodule Astarte.VMQ.Plugin do
182182
end
183183
end
184184

185+
@spec ack_device_deletion(String.t(), Astarte.Core.Device.encoded_device_id()) ::
186+
:ok | {:error, term()}
185187
def ack_device_deletion(realm_name, encoded_device_id) do
186-
timestamp = now_us_x10_timestamp()
187-
publish_internal_message(realm_name, encoded_device_id, "/f", "", timestamp)
188-
{:ok, decoded_device_id} = Device.decode_device_id(encoded_device_id)
189-
{:ok, _} = Queries.ack_device_deletion(realm_name, decoded_device_id)
190-
:ok
188+
with {:ok, decoded_device_id} <- Device.decode_device_id(encoded_device_id),
189+
{:ok, _} <- Queries.ack_device_deletion(realm_name, decoded_device_id) do
190+
# Only send /f message after successful database write
191+
timestamp = now_us_x10_timestamp()
192+
publish_internal_message(realm_name, encoded_device_id, "/f", "", timestamp)
193+
:ok
194+
end
191195
end
192196

193197
defp setup_heartbeat_timer(realm, device_id, session_pid) do

lib/astarte_vmq_plugin/rpc/server.ex

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,12 +70,23 @@ defmodule Astarte.VMQ.Plugin.RPC.Server do
7070
@impl GenServer
7171
def handle_call({:delete, %{realm_name: realm, device_id: device}}, _from, state) do
7272
client_id = "#{realm}/#{device}"
73-
# Either the client has been deleted or it is :not_found,
74-
# which means that there is no session anyway.
73+
74+
# Plugin.disconnect_client/2 return :ok or {:error, :not_found}, which
75+
# means that there is no session anyway.
7576
Plugin.disconnect_client(client_id, true)
76-
Plugin.ack_device_deletion(realm, device)
7777

78-
{:reply, :ok, state}
78+
case Plugin.ack_device_deletion(realm, device) do
79+
:ok ->
80+
{:reply, :ok, state}
81+
82+
{:error, reason} ->
83+
Logger.error(
84+
"Failed to acknowledge device deletion for #{realm}/#{device}: #{inspect(reason)}",
85+
tag: "vmq_ack_deletion_failed"
86+
)
87+
88+
{:reply, {:error, reason}, state}
89+
end
7990
end
8091

8192
@impl GenServer

0 commit comments

Comments
 (0)