Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions lib/astarte_vmq_plugin/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ defmodule Astarte.VMQ.Plugin.Application do

use Application

alias Astarte.VMQ.Plugin.RPC.Handler, as: RPCHandler
alias Astarte.RPC.Protocol.VMQ.Plugin, as: Protocol
alias Astarte.VMQ.Plugin.Config

require Logger
Expand All @@ -42,7 +40,8 @@ defmodule Astarte.VMQ.Plugin.Application do
{Registry, keys: :unique, name: AstarteVMQPluginConnectionSynchronizer.Registry},
Astarte.VMQ.Plugin.Connection.Synchronizer.Supervisor,
{Astarte.VMQ.Plugin.Publisher, [Config.registry_mfa()]},
{Astarte.RPC.AMQP.Server, [amqp_queue: Protocol.amqp_queue(), handler: RPCHandler]},
{Horde.Registry, [name: Registry.VMQPluginRPC, keys: :unique, members: :auto]},
Astarte.VMQ.Plugin.RPC.Supervisor,
{Xandra.Cluster, Config.xandra_options!()}
]

Expand Down
157 changes: 0 additions & 157 deletions lib/astarte_vmq_plugin/rpc/handler.ex

This file was deleted.

146 changes: 146 additions & 0 deletions lib/astarte_vmq_plugin/rpc/server.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
#
# This file is part of Astarte.
#
# Copyright 2025 SECO Mind Srl
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# SPDX-License-Identifier: Apache-2.0
#

defmodule Astarte.VMQ.Plugin.RPC.Server do
@moduledoc false
alias Astarte.VMQ.Plugin.Publisher
alias Astarte.VMQ.Plugin

use GenServer
require Logger

# Public API

def start_link(args, opts \\ []) do
name = {:via, Horde.Registry, {Registry.VMQPluginRPC, :server}}
opts = Keyword.put(opts, :name, name)

GenServer.start_link(__MODULE__, args, opts)
end

# Callbacks

@impl GenServer
def init(_) do
Process.flag(:trap_exit, true)
{:ok, nil}
end

@impl GenServer
def handle_call({:disconnect, %{client_id: nil}}, _from, state) do
Logger.warning("Disconnect with empty client id.", tag: "disconnect_empty_client_id")
{:reply, {:error, :empty_client_id}, state}
end

@impl GenServer
def handle_call({:disconnect, %{discard_state: nil}}, _from, state) do
Logger.warning("Disconnect with empty discard state.", tag: "disconnect_empty_discard_state")
{:reply, {:error, :empty_discard_state}, state}
end

@impl GenServer
def handle_call(
{:disconnect, %{client_id: client_id, discard_state: discard_state}},
_from,
state
) do
answer = Plugin.disconnect_client(client_id, discard_state)
{:reply, answer, state}
end

@impl GenServer
def handle_call({:delete, %{realm_name: realm, device_id: device}}, _from, state) do
client_id = "#{realm}/#{device}"
# Either the client has been deleted or it is :not_found,
# which means that there is no session anyway.
Plugin.disconnect_client(client_id, true)
Plugin.ack_device_deletion(realm, device)

{:reply, :ok, state}
end

@impl GenServer
def handle_call({:publish, %{topic_tokens: []}}, _from, state) do
_ = Logger.warning("Publish with empty topic tokens", tag: "publish_empty_topic_tokens")

{:reply, {:error, :empty_topic_tokens}, state}
end

# This also handles the case of qos == nil, that is > 2
@impl GenServer
def handle_call({:publish, %{qos: qos}}, _from, state) when qos > 2 or qos < 0 do
_ = Logger.warning("Publish with invalid QoS", tag: "publish_invalid_qos")

{:reply, {:error, :invalid_qos}, state}
end

@impl GenServer
def handle_call(
{:publish, %{topic_tokens: topic_tokens, payload: payload, qos: qos}},
_from,
state
) do
answer =
case Publisher.publish(topic_tokens, payload, qos) do
{:ok, {local_matches, remote_matches}} ->
{:ok, %{local_matches: local_matches, remote_matches: remote_matches}}

{:error, reason} ->
Logger.warning("Publish failed with reason: #{inspect(reason)}", tag: "publish_failed")
{:error, reason}

other_err ->
Logger.warning("Unknown error in publish: #{inspect(other_err)}",
tag: "publish_failed_unknown_error"
)

{:error, :publish_error}
end

{:reply, answer, state}
end

# Horde dynamic supervisor signals

@impl GenServer
def handle_info(
{:EXIT, _pid, {:name_conflict, {_name, _value}, _registry, _winning_pid}},
state
) do
_ =
Logger.warning(
"Received a :name_confict signal from the outer space, maybe a netsplit occurred? Gracefully shutting down.",
tag: "RPC exit"
)

{:stop, :shutdown, state}
end

@impl GenServer
def handle_info({:EXIT, _pid, :shutdown}, state) do
_ =
Logger.warning(
"Received a :shutdown signal from the outer space, maybe the supervisor is mad? Gracefully shutting down.",
tag: "RPC exit"
)

{:stop, :shutdown, state}
end
end
52 changes: 52 additions & 0 deletions lib/astarte_vmq_plugin/rpc/supervisor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#
# This file is part of Astarte.
#
# Copyright 2025 SECO Mind Srl
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# SPDX-License-Identifier: Apache-2.0
#

defmodule Astarte.VMQ.Plugin.RPC.Supervisor do
@moduledoc """
Supervisor of the RPC server.
"""

use Horde.DynamicSupervisor

def start_link(init_arg, opts \\ []) do
opts = [{:name, __MODULE__} | opts]

with {:ok, pid} <- Horde.DynamicSupervisor.start_link(__MODULE__, init_arg, opts) do
case Horde.Registry.lookup(Registry.VMQPluginRPC, :server) do
[] ->
_ = Horde.DynamicSupervisor.start_child(pid, Astarte.VMQ.Plugin.RPC.Server)
end

{:ok, pid}
end
end

@impl Horde.DynamicSupervisor
def init(init_arg) do
[
strategy: :one_for_one,
members: :auto,
distribution_strategy: Horde.UniformDistribution,
restart: :always
]
|> Keyword.merge(init_arg)
|> Horde.DynamicSupervisor.init()
end
end
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ defmodule Astarte.VMQ.Plugin.Mixfile do
{:vernemq_dev, github: "vernemq/vernemq_dev"},
{:excoveralls, "~> 0.15", only: :test},
{:pretty_log, "~> 0.1"},
{:horde, "~> 0.9"},
{:dialyxir, "~> 1.4", only: [:dev, :ci], runtime: false},
{:xandra, "~> 0.14"}
]
Expand Down
Loading
Loading