diff --git a/lib/astarte_vmq_plugin/application.ex b/lib/astarte_vmq_plugin/application.ex index f873f1f..325f6df 100644 --- a/lib/astarte_vmq_plugin/application.ex +++ b/lib/astarte_vmq_plugin/application.ex @@ -23,8 +23,6 @@ defmodule Astarte.VMQ.Plugin.Application do use Application - alias Astarte.VMQ.Plugin.RPC.Handler, as: RPCHandler - alias Astarte.RPC.Protocol.VMQ.Plugin, as: Protocol alias Astarte.VMQ.Plugin.Config require Logger @@ -42,7 +40,8 @@ defmodule Astarte.VMQ.Plugin.Application do {Registry, keys: :unique, name: AstarteVMQPluginConnectionSynchronizer.Registry}, Astarte.VMQ.Plugin.Connection.Synchronizer.Supervisor, {Astarte.VMQ.Plugin.Publisher, [Config.registry_mfa()]}, - {Astarte.RPC.AMQP.Server, [amqp_queue: Protocol.amqp_queue(), handler: RPCHandler]}, + {Horde.Registry, [name: Registry.VMQPluginRPC, keys: :unique, members: :auto]}, + Astarte.VMQ.Plugin.RPC.Supervisor, {Xandra.Cluster, Config.xandra_options!()} ] diff --git a/lib/astarte_vmq_plugin/rpc/handler.ex b/lib/astarte_vmq_plugin/rpc/handler.ex deleted file mode 100644 index 9ae7832..0000000 --- a/lib/astarte_vmq_plugin/rpc/handler.ex +++ /dev/null @@ -1,157 +0,0 @@ -# -# This file is part of Astarte. -# -# Copyright 2018 - 2023 SECO Mind Srl -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -defmodule Astarte.VMQ.Plugin.RPC.Handler do - @behaviour Astarte.RPC.Handler - - alias Astarte.RPC.Protocol.VMQ.Plugin.{ - Call, - Delete, - Disconnect, - GenericErrorReply, - GenericOkReply, - Publish, - PublishReply, - Reply - } - - alias Astarte.VMQ.Plugin.Publisher - alias Astarte.VMQ.Plugin - - require Logger - - def handle_rpc(payload) do - with {:ok, call_tuple} <- extract_call_tuple(Call.decode(payload)) do - call_rpc(call_tuple) - end - end - - defp extract_call_tuple(%Call{call: nil}) do - Logger.warn("Received empty call", tag: "received_empty_call") - {:error, :empty_call} - end - - defp extract_call_tuple(%Call{call: call_tuple}) do - {:ok, call_tuple} - end - - defp call_rpc({:disconnect, %Disconnect{client_id: ""}}) do - Logger.warn("Disconnect with empty client_id", tag: "disconnect_empty_client_id") - generic_error(:client_id_is_empty, "client_id is \"\"") - end - - defp call_rpc({:disconnect, %Disconnect{discard_state: ""}}) do - Logger.warn("Disconnect with empty discard_state", tag: "disconnect_empty_discard_state") - generic_error(:discard_state_is_empty, "discard_state is \"\"") - end - - defp call_rpc({:disconnect, %Disconnect{client_id: client_id, discard_state: discard_state}}) do - case Plugin.disconnect_client(client_id, discard_state) do - :ok -> - generic_ok() - - {:error, reason} -> - generic_error(reason) - end - end - - defp call_rpc({:delete, %Delete{realm_name: realm_name, device_id: device_id}}) do - client_id = "#{realm_name}/#{device_id}" - # Either the client has been deleted or it is :not_found, - # which means that there is no session anyway. - Plugin.disconnect_client(client_id, true) - Plugin.ack_device_deletion(realm_name, device_id) - generic_ok() - end - - defp call_rpc({:publish, %Publish{topic_tokens: []}}) do - Logger.warn("Publish with empty topic_tokens", tag: "publish_empty_topic_tokens") - generic_error(:empty_topic_tokens, "empty topic tokens") - end - - # This also handles the case of qos == nil, that is > 2 - defp call_rpc({:publish, %Publish{qos: qos}}) when qos > 2 or qos < 0 do - Logger.warn("Publish with invalid QoS", tag: "publish_invalid_qos") - generic_error(:invalid_qos, "invalid QoS") - end - - defp call_rpc({:publish, %Publish{topic_tokens: topic_tokens, payload: payload, qos: qos}}) do - case Publisher.publish(topic_tokens, payload, qos) do - {:ok, {local_matches, remote_matches}} -> - publish_reply(local_matches, remote_matches) - - {:error, reason} -> - Logger.warn("Publish failed with reason: #{inspect(reason)}", tag: "publish_failed") - generic_error(reason) - - other_err -> - Logger.warn("Unknown error in publish: #{inspect(other_err)}", - tag: "publish_failed_unknown_error" - ) - - generic_error(:publish_error, "error during publish") - end - end - - defp generic_error( - error_name, - user_readable_message \\ "", - user_readable_error_name \\ "", - error_data \\ "" - ) do - %GenericErrorReply{ - error_name: to_string(error_name), - user_readable_message: user_readable_message, - user_readable_error_name: user_readable_error_name, - error_data: error_data - } - |> encode_reply(:generic_error_reply) - |> ok_wrap - end - - defp generic_ok do - %GenericOkReply{} - |> encode_reply(:generic_ok_reply) - |> ok_wrap - end - - defp publish_reply(local_matches, remote_matches) do - %PublishReply{local_matches: local_matches, remote_matches: remote_matches} - |> encode_reply(:publish_reply) - |> ok_wrap - end - - defp encode_reply(%GenericOkReply{} = reply, _reply_type) do - %Reply{reply: {:generic_ok_reply, reply}, error: false} - |> Reply.encode() - end - - defp encode_reply(%GenericErrorReply{} = reply, _reply_type) do - %Reply{reply: {:generic_error_reply, reply}, error: true} - |> Reply.encode() - end - - defp encode_reply(reply, reply_type) do - %Reply{reply: {reply_type, reply}, error: false} - |> Reply.encode() - end - - defp ok_wrap(result) do - {:ok, result} - end -end diff --git a/lib/astarte_vmq_plugin/rpc/server.ex b/lib/astarte_vmq_plugin/rpc/server.ex new file mode 100644 index 0000000..d913b5f --- /dev/null +++ b/lib/astarte_vmq_plugin/rpc/server.ex @@ -0,0 +1,146 @@ +# +# This file is part of Astarte. +# +# Copyright 2025 SECO Mind Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 +# + +defmodule Astarte.VMQ.Plugin.RPC.Server do + @moduledoc false + alias Astarte.VMQ.Plugin.Publisher + alias Astarte.VMQ.Plugin + + use GenServer + require Logger + + # Public API + + def start_link(args, opts \\ []) do + name = {:via, Horde.Registry, {Registry.VMQPluginRPC, :server}} + opts = Keyword.put(opts, :name, name) + + GenServer.start_link(__MODULE__, args, opts) + end + + # Callbacks + + @impl GenServer + def init(_) do + Process.flag(:trap_exit, true) + {:ok, nil} + end + + @impl GenServer + def handle_call({:disconnect, %{client_id: nil}}, _from, state) do + Logger.warning("Disconnect with empty client id.", tag: "disconnect_empty_client_id") + {:reply, {:error, :empty_client_id}, state} + end + + @impl GenServer + def handle_call({:disconnect, %{discard_state: nil}}, _from, state) do + Logger.warning("Disconnect with empty discard state.", tag: "disconnect_empty_discard_state") + {:reply, {:error, :empty_discard_state}, state} + end + + @impl GenServer + def handle_call( + {:disconnect, %{client_id: client_id, discard_state: discard_state}}, + _from, + state + ) do + answer = Plugin.disconnect_client(client_id, discard_state) + {:reply, answer, state} + end + + @impl GenServer + def handle_call({:delete, %{realm_name: realm, device_id: device}}, _from, state) do + client_id = "#{realm}/#{device}" + # Either the client has been deleted or it is :not_found, + # which means that there is no session anyway. + Plugin.disconnect_client(client_id, true) + Plugin.ack_device_deletion(realm, device) + + {:reply, :ok, state} + end + + @impl GenServer + def handle_call({:publish, %{topic_tokens: []}}, _from, state) do + _ = Logger.warning("Publish with empty topic tokens", tag: "publish_empty_topic_tokens") + + {:reply, {:error, :empty_topic_tokens}, state} + end + + # This also handles the case of qos == nil, that is > 2 + @impl GenServer + def handle_call({:publish, %{qos: qos}}, _from, state) when qos > 2 or qos < 0 do + _ = Logger.warning("Publish with invalid QoS", tag: "publish_invalid_qos") + + {:reply, {:error, :invalid_qos}, state} + end + + @impl GenServer + def handle_call( + {:publish, %{topic_tokens: topic_tokens, payload: payload, qos: qos}}, + _from, + state + ) do + answer = + case Publisher.publish(topic_tokens, payload, qos) do + {:ok, {local_matches, remote_matches}} -> + {:ok, %{local_matches: local_matches, remote_matches: remote_matches}} + + {:error, reason} -> + Logger.warning("Publish failed with reason: #{inspect(reason)}", tag: "publish_failed") + {:error, reason} + + other_err -> + Logger.warning("Unknown error in publish: #{inspect(other_err)}", + tag: "publish_failed_unknown_error" + ) + + {:error, :publish_error} + end + + {:reply, answer, state} + end + + # Horde dynamic supervisor signals + + @impl GenServer + def handle_info( + {:EXIT, _pid, {:name_conflict, {_name, _value}, _registry, _winning_pid}}, + state + ) do + _ = + Logger.warning( + "Received a :name_confict signal from the outer space, maybe a netsplit occurred? Gracefully shutting down.", + tag: "RPC exit" + ) + + {:stop, :shutdown, state} + end + + @impl GenServer + def handle_info({:EXIT, _pid, :shutdown}, state) do + _ = + Logger.warning( + "Received a :shutdown signal from the outer space, maybe the supervisor is mad? Gracefully shutting down.", + tag: "RPC exit" + ) + + {:stop, :shutdown, state} + end +end diff --git a/lib/astarte_vmq_plugin/rpc/supervisor.ex b/lib/astarte_vmq_plugin/rpc/supervisor.ex new file mode 100644 index 0000000..e875e93 --- /dev/null +++ b/lib/astarte_vmq_plugin/rpc/supervisor.ex @@ -0,0 +1,52 @@ +# +# This file is part of Astarte. +# +# Copyright 2025 SECO Mind Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 +# + +defmodule Astarte.VMQ.Plugin.RPC.Supervisor do + @moduledoc """ + Supervisor of the RPC server. + """ + + use Horde.DynamicSupervisor + + def start_link(init_arg, opts \\ []) do + opts = [{:name, __MODULE__} | opts] + + with {:ok, pid} <- Horde.DynamicSupervisor.start_link(__MODULE__, init_arg, opts) do + case Horde.Registry.lookup(Registry.VMQPluginRPC, :server) do + [] -> + _ = Horde.DynamicSupervisor.start_child(pid, Astarte.VMQ.Plugin.RPC.Server) + end + + {:ok, pid} + end + end + + @impl Horde.DynamicSupervisor + def init(init_arg) do + [ + strategy: :one_for_one, + members: :auto, + distribution_strategy: Horde.UniformDistribution, + restart: :always + ] + |> Keyword.merge(init_arg) + |> Horde.DynamicSupervisor.init() + end +end diff --git a/mix.exs b/mix.exs index 01757f8..ab80ed5 100644 --- a/mix.exs +++ b/mix.exs @@ -90,6 +90,7 @@ defmodule Astarte.VMQ.Plugin.Mixfile do {:vernemq_dev, github: "vernemq/vernemq_dev"}, {:excoveralls, "~> 0.15", only: :test}, {:pretty_log, "~> 0.1"}, + {:horde, "~> 0.9"}, {:dialyxir, "~> 1.4", only: [:dev, :ci], runtime: false}, {:xandra, "~> 0.14"} ] diff --git a/mix.lock b/mix.lock index e5adfe8..76d7e56 100644 --- a/mix.lock +++ b/mix.lock @@ -9,6 +9,7 @@ "cyanide": {:hex, :cyanide, "2.0.0", "f97b700b87f9b0679ae812f0c4b7fe35ea6541a4121a096cf10287941b7a6d55", [:mix], [], "hexpm", "7f9748251804c2a2115b539202568e1117ab2f0ae09875853fb89cc94ae19dd1"}, "db_connection": {:hex, :db_connection, "2.5.0", "bb6d4f30d35ded97b29fe80d8bd6f928a1912ca1ff110831edcd238a1973652c", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "c92d5ba26cd69ead1ff7582dbb860adeedfff39774105a4f1c92cbb654b55aa2"}, "decimal": {:hex, :decimal, "2.1.1", "5611dca5d4b2c3dd497dec8f68751f1f1a54755e8ed2a966c2633cf885973ad6", [:mix], [], "hexpm", "53cfe5f497ed0e7771ae1a475575603d77425099ba5faef9394932b35020ffcc"}, + "delta_crdt": {:hex, :delta_crdt, "0.6.5", "c7bb8c2c7e60f59e46557ab4e0224f67ba22f04c02826e273738f3dcc4767adc", [:mix], [{:merkle_map, "~> 0.2.0", [hex: :merkle_map, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "c6ae23a525d30f96494186dd11bf19ed9ae21d9fe2c1f1b217d492a7cc7294ae"}, "dialyxir": {:hex, :dialyxir, "1.4.2", "764a6e8e7a354f0ba95d58418178d486065ead1f69ad89782817c296d0d746a5", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "516603d8067b2fd585319e4b13d3674ad4f314a5902ba8130cd97dc902ce6bbd"}, "ecto": {:hex, :ecto, "3.10.2", "6b887160281a61aa16843e47735b8a266caa437f80588c3ab80a8a960e6abe37", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "6a895778f0d7648a4b34b486af59a1c8009041fbdf2b17f1ac215eb829c60235"}, "ecto_morph": {:hex, :ecto_morph, "0.1.29", "bc0b915779636bd2d30c54cad6922b3cb40f85b1d4ad59bdffd3c788d9d1f972", [:mix], [{:ecto, ">= 3.0.3", [hex: :ecto, repo: "hexpm", optional: false]}], "hexpm", "814bed72e3d03b278c1dfb3fbc4da37f478a37518ee54f010c1ad9254f1ca0e3"}, @@ -18,10 +19,13 @@ "exprotobuf": {:hex, :exprotobuf, "1.2.17", "3003937da617f588a8fb63ebdd7b127a18d78d6502623c272076fd54c07c4de1", [:mix], [{:gpb, "~> 4.0", [hex: :gpb, repo: "hexpm", optional: false]}], "hexpm", "e07ec1e5ae6f8c1c8521450d5f6b658c8c700b1f34c70356e91ece0766f4361a"}, "goldrush": {:hex, :goldrush, "0.1.9", "f06e5d5f1277da5c413e84d5a2924174182fb108dabb39d5ec548b27424cd106", [:rebar3], [], "hexpm", "99cb4128cffcb3227581e5d4d803d5413fa643f4eb96523f77d9e6937d994ceb"}, "hackney": {:hex, :hackney, "1.18.1", "f48bf88f521f2a229fc7bae88cf4f85adc9cd9bcf23b5dc8eb6a1788c662c4f6", [:rebar3], [{:certifi, "~>2.9.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~>6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~>1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~>1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.3.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~>1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "a4ecdaff44297e9b5894ae499e9a070ea1888c84afdd1fd9b7b2bc384950128e"}, + "horde": {:hex, :horde, "0.9.0", "522342bd7149aeed453c97692a8bca9cf7c9368c5a489afd802e575dc8df54a6", [:mix], [{:delta_crdt, "~> 0.6.2", [hex: :delta_crdt, repo: "hexpm", optional: false]}, {:libring, "~> 1.4", [hex: :libring, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_poller, "~> 0.5.0 or ~> 1.0", [hex: :telemetry_poller, repo: "hexpm", optional: false]}], "hexpm", "fae11e5bc9c980038607d0c3338cdf7f97124a5d5382fd4b6fb6beaab8e214fe"}, "idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"}, "jason": {:hex, :jason, "1.4.0", "e855647bc964a44e2f67df589ccf49105ae039d4179db7f6271dfd3843dc27e6", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "79a3791085b2a0f743ca04cec0f7be26443738779d09302e01318f97bdb82121"}, "jsx": {:hex, :jsx, "3.1.0", "d12516baa0bb23a59bb35dccaf02a1bd08243fcbb9efe24f2d9d056ccff71268", [:rebar3], [], "hexpm", "0c5cc8fdc11b53cc25cf65ac6705ad39e54ecc56d1c22e4adb8f5a53fb9427f3"}, + "libring": {:hex, :libring, "1.7.0", "4f245d2f1476cd7ed8f03740f6431acba815401e40299208c7f5c640e1883bda", [:mix], [], "hexpm", "070e3593cb572e04f2c8470dd0c119bc1817a7a0a7f88229f43cf0345268ec42"}, "logfmt": {:hex, :logfmt, "3.3.2", "c432765cff9c26cf4ba78cf66ece183e56562dfeba6e2d9f077804cc4c756677", [:mix], [], "hexpm", "8dfc07bf11d362d1ffb11fa34647f4e78dba47247589cc94fd8c9155889c8fcb"}, + "merkle_map": {:hex, :merkle_map, "0.2.1", "01a88c87a6b9fb594c67c17ebaf047ee55ffa34e74297aa583ed87148006c4c8", [:mix], [], "hexpm", "fed4d143a5c8166eee4fa2b49564f3c4eace9cb252f0a82c1613bba905b2d04d"}, "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"}, "mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"}, "nimble_options": {:hex, :nimble_options, "1.0.2", "92098a74df0072ff37d0c12ace58574d26880e522c22801437151a159392270e", [:mix], [], "hexpm", "fd12a8db2021036ce12a309f26f564ec367373265b53e25403f0ee697380f1b8"}, @@ -33,6 +37,7 @@ "skogsra": {:hex, :skogsra, "2.4.1", "50f0e984d7560ffab30f8f5bb66e177a75d2dc72ed12de373aed7b6dfb54fb8c", [:mix], [{:jason, "~> 1.3", [hex: :jason, repo: "hexpm", optional: true]}, {:yamerl, "~> 0.10", [hex: :yamerl, repo: "hexpm", optional: true]}], "hexpm", "ffef5de2bfb1618babf692803acdd158cc081324735e28deea982dc87c9e565f"}, "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.7", "354c321cf377240c7b8716899e182ce4890c5938111a1296add3ec74cf1715df", [:make, :mix, :rebar3], [], "hexpm", "fe4c190e8f37401d30167c8c405eda19469f34577987c76dde613e838bbc67f8"}, "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, + "telemetry_poller": {:hex, :telemetry_poller, "1.2.0", "ba82e333215aed9dd2096f93bd1d13ae89d249f82760fcada0850ba33bac154b", [:rebar3], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7216e21a6c326eb9aa44328028c34e9fd348fb53667ca837be59d0aa2a0156e8"}, "thoas": {:hex, :thoas, "1.0.0", "567c03902920827a18a89f05b79a37b5bf93553154b883e0131801600cf02ce0", [:rebar3], [], "hexpm", "fc763185b932ecb32a554fb735ee03c3b6b1b31366077a2427d2a97f3bd26735"}, "unicode_util_compat": {:hex, :unicode_util_compat, "0.7.0", "bc84380c9ab48177092f43ac89e4dfa2c6d62b40b8bd132b1059ecc7232f9a78", [:rebar3], [], "hexpm", "25eee6d67df61960cf6a794239566599b09e17e668d3700247bc498638152521"}, "vernemq_dev": {:git, "https://github.com/vernemq/vernemq_dev.git", "6d622aa8c901ae7777433aef2bd049e380c474a6", []}, diff --git a/test/astarte_vmq_plugin_rpc_test.exs b/test/astarte_vmq_plugin_rpc_test.exs index dc7fd56..e2afd2c 100644 --- a/test/astarte_vmq_plugin_rpc_test.exs +++ b/test/astarte_vmq_plugin_rpc_test.exs @@ -19,16 +19,7 @@ defmodule Astarte.VMQ.Plugin.RPCTest do use ExUnit.Case - alias Astarte.RPC.Protocol.VMQ.Plugin.{ - Call, - GenericErrorReply, - Publish, - PublishReply, - Reply - } - alias Astarte.VMQ.Plugin.MockVerne - alias Astarte.VMQ.Plugin.RPC.Handler @topic ["some", "topic"] @payload "importantdata" @@ -36,89 +27,46 @@ defmodule Astarte.VMQ.Plugin.RPCTest do setup_all do MockVerne.start_link() - :ok + %{ + rpc_server: {:via, Horde.Registry, {Registry.VMQPluginRPC, :server}} + } end - test "invalid topic Publish call" do - serialized_call = - %Call{ - version: 0, - call: { - :publish, - %Publish{ - topic_tokens: [], - payload: @payload, - qos: 2 - } - } + test "invalid topic Publish call", %{rpc_server: server} do + data = + %{ + topic_tokens: [], + payload: @payload, + qos: 2 } - |> Call.encode() - - assert {:ok, ser_reply} = Handler.handle_rpc(serialized_call) - assert %Reply{ - version: 0, - reply: { - :generic_error_reply, - %GenericErrorReply{error_name: "empty_topic_tokens"} - } - } = Reply.decode(ser_reply) + assert {:error, :empty_topic_tokens} = GenServer.call(server, {:publish, data}) assert MockVerne.consume_message() == nil end - test "invalid qos Publish call" do - serialized_call = - %Call{ - version: 0, - call: { - :publish, - %Publish{ - topic_tokens: @topic, - payload: @payload, - qos: 42 - } - } + test "invalid qos Publish call", %{rpc_server: server} do + data = + %{ + topic_tokens: @topic, + payload: @payload, + qos: 42 } - |> Call.encode() - assert {:ok, ser_reply} = Handler.handle_rpc(serialized_call) - - assert %Reply{ - version: 0, - reply: { - :generic_error_reply, - %GenericErrorReply{error_name: "invalid_qos"} - } - } = Reply.decode(ser_reply) + assert {:error, :invalid_qos} = GenServer.call(server, {:publish, data}) assert MockVerne.consume_message() == nil end - test "valid Publish call" do - serialized_call = - %Call{ - version: 0, - call: { - :publish, - %Publish{ - topic_tokens: @topic, - payload: @payload, - qos: 2 - } - } + test "valid Publish call", %{rpc_server: server} do + data = + %{ + topic_tokens: @topic, + payload: @payload, + qos: 2 } - |> Call.encode() - - assert {:ok, ser_reply} = Handler.handle_rpc(serialized_call) - assert %Reply{ - version: 0, - reply: { - :publish_reply, - %PublishReply{} - } - } = Reply.decode(ser_reply) + assert {:ok, _} = GenServer.call(server, {:publish, data}) assert MockVerne.consume_message() == {@topic, @payload, %{qos: 2}} end