Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions apps/astarte_appengine_api/config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ config :astarte_appengine_api,
:data_updater_plant_rpc_client,
Astarte.AppEngine.API.RPC.DataUpdaterPlant.ClientMock

config :astarte_appengine_api,
:vernemq_plugin_rpc_client,
Astarte.AppEngine.API.RPC.VMQPlugin.ClientMock

config :stream_data,
max_runs: 50

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,9 @@ defmodule Astarte.AppEngine.API.Application do
{Cluster.Supervisor,
[Config.cluster_topologies!(), [name: Astarte.AppEngine.API.ClusterSupervisor]]},
{Horde.Registry, [keys: :unique, name: Registry.DataUpdaterRPC, members: :auto]},
{Horde.Registry, [keys: :unique, name: Registry.VMQPluginRPC, members: :auto]},
Astarte.AppEngine.APIWeb.Telemetry,
{Phoenix.PubSub, name: Astarte.AppEngine.API.PubSub},
# TODO: still needed for VerneMQ RPC, remove once moved to erlang clustering
Astarte.RPC.AMQP.Client,
Astarte.AppEngine.API.Rooms.MasterSupervisor,
Astarte.AppEngine.API.Rooms.AMQPClient,
Astarte.AppEngine.APIWeb.Endpoint,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,38 +22,22 @@ defmodule Astarte.AppEngine.API.RPC.VMQPlugin do
"""
require Logger

alias Astarte.RPC.Protocol.VMQ.Plugin, as: Protocol

alias Astarte.RPC.Protocol.VMQ.Plugin.{
Call,
GenericErrorReply,
GenericOkReply,
Publish,
PublishReply,
Reply
}

alias Astarte.AppEngine.API.Config

require Logger

@rpc_client Config.rpc_client!()
@destination Protocol.amqp_queue()
@rpc_behaviour Application.compile_env(
:astarte_appengine_api,
:vernemq_plugin_rpc_client,
Astarte.AppEngine.API.RPC.VMQPlugin.Client
)

def publish(topic, payload, qos)
when is_binary(topic) and is_binary(payload) and is_integer(qos) and qos >= 0 and qos <= 2 do
with {:ok, tokens} <- split_topic(topic) do
_ = Logger.debug("Going to publish value on MQTT.")

%Publish{
data = %{
topic_tokens: tokens,
payload: payload,
qos: qos
}
|> encode_call(:publish)
|> @rpc_client.rpc_call(@destination)
|> decode_reply()
|> extract_reply()

@rpc_behaviour.publish(data)
end
end

Expand All @@ -63,46 +47,4 @@ defmodule Astarte.AppEngine.API.RPC.VMQPlugin do
tokens -> {:ok, tokens}
end
end

defp encode_call(call, callname) do
%Call{call: {callname, call}}
|> Call.encode()
end

defp decode_reply({:ok, encoded_reply}) when is_binary(encoded_reply) do
%Reply{reply: reply} = Reply.decode(encoded_reply)

_ = Logger.debug("Got reply from VWQ: #{inspect(reply)}.")

reply
end

defp decode_reply({:error, reason}) do
_ = Logger.warning("RPC error: #{inspect(reason)}.", tag: "rpc_remote_exception")
{:error, reason}
end

defp extract_reply({:publish_reply, %PublishReply{} = reply}) do
_ = Logger.debug("Got publish reply from VMQ.")

{:ok, %{local_matches: reply.local_matches, remote_matches: reply.remote_matches}}
end

defp extract_reply({:generic_ok_reply, %GenericOkReply{}}) do
_ = Logger.debug("Got ok reply from VMQ.")

:ok
end

defp extract_reply({:generic_error_reply, %GenericErrorReply{error_name: "session_not_found"}}) do
{:error, :session_not_found}
end

defp extract_reply({:generic_error_reply, error_struct = %GenericErrorReply{}}) do
error_map = Map.from_struct(error_struct)

_ = Logger.error("Error while publishing value on MQTT.", tag: "vmq_publish_error")

{:error, error_map}
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#
# This file is part of Astarte.
#
# Copyright 2025 SECO Mind Srl
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# SPDX-License-Identifier: Apache-2.0
#

defmodule Astarte.AppEngine.API.RPC.VMQPlugin.Behaviour do
@moduledoc false

@callback publish(%{topic_tokens: list(binary()), payload: binary(), qos: 0 | 1 | 2}) ::
{:ok, %{local_matches: integer(), remote_matches: integer()}} | {:error, term()}
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#
# This file is part of Astarte.
#
# Copyright 2025 SECO Mind Srl
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# SPDX-License-Identifier: Apache-2.0
#

defmodule Astarte.AppEngine.API.RPC.VMQPlugin.Client do
@moduledoc false
@behaviour Astarte.AppEngine.API.RPC.VMQPlugin.Behaviour

@impl Astarte.AppEngine.API.RPC.VMQPlugin.Behaviour
def publish(data) do
case Horde.Registry.lookup(Registry.VMQPluginRPC, :server) do
[] -> {:error, :no_rpc_server}
[{pid, _value}] -> GenServer.call(pid, {:publish, data})
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -44,49 +44,45 @@ defmodule Astarte.AppEngine.APIWeb.DataTransmitterTest do
@timestamp DateTime.utc_now()
@metadata %{some: "metadata"}

@encoded_generic_ok_reply %Reply{
reply: {:generic_ok_reply, %GenericOkReply{}}
}
|> Reply.encode()

test "datastream push with no opts" do
MockRPCClient
|> expect(:rpc_call, fn serialized_call, @vmq_plugin_destination ->
assert %Call{call: {:publish, %Publish{} = publish_call}} = Call.decode(serialized_call)
answer = {:ok, %{local_matches: 0, remote_matches: 0}}

Astarte.AppEngine.API.RPC.VMQPlugin.ClientMock
|> expect(:publish, fn data ->
encoded_payload = Cyanide.encode!(%{v: @payload})

assert %Publish{
assert %{
topic_tokens: [@realm, @encoded_device_id, @interface | @path_tokens],
payload: ^encoded_payload,
qos: 0
} = publish_call
} = data

{:ok, @encoded_generic_ok_reply}
answer
end)

assert :ok = DataTransmitter.push_datastream(@realm, @device_id, @interface, @path, @payload)
assert ^answer =
DataTransmitter.push_datastream(@realm, @device_id, @interface, @path, @payload)
end

test "datastream push with opts" do
MockRPCClient
|> expect(:rpc_call, fn serialized_call, @vmq_plugin_destination ->
assert %Call{call: {:publish, %Publish{} = publish_call}} = Call.decode(serialized_call)
answer = {:ok, %{local_matches: 0, remote_matches: 0}}

Astarte.AppEngine.API.RPC.VMQPlugin.ClientMock
|> expect(:publish, fn data ->
encoded_payload = Cyanide.encode!(%{v: @payload, m: @metadata, t: @timestamp})

assert %Publish{
assert %{
topic_tokens: [@realm, @encoded_device_id, @interface | @path_tokens],
payload: ^encoded_payload,
qos: 1
} = publish_call
} = data

{:ok, @encoded_generic_ok_reply}
answer
end)

opts = [metadata: @metadata, timestamp: @timestamp, qos: 1]

assert :ok =
assert ^answer =
DataTransmitter.push_datastream(
@realm,
@device_id,
Expand All @@ -98,43 +94,43 @@ defmodule Astarte.AppEngine.APIWeb.DataTransmitterTest do
end

test "set property with no opts" do
MockRPCClient
|> expect(:rpc_call, fn serialized_call, @vmq_plugin_destination ->
assert %Call{call: {:publish, %Publish{} = publish_call}} = Call.decode(serialized_call)
answer = {:ok, %{local_matches: 0, remote_matches: 0}}

Astarte.AppEngine.API.RPC.VMQPlugin.ClientMock
|> expect(:publish, fn data ->
encoded_payload = Cyanide.encode!(%{v: @payload})

assert %Publish{
assert %{
topic_tokens: [@realm, @encoded_device_id, @interface | @path_tokens],
payload: ^encoded_payload,
qos: 2
} = publish_call
} = data

{:ok, @encoded_generic_ok_reply}
answer
end)

assert :ok = DataTransmitter.set_property(@realm, @device_id, @interface, @path, @payload)
assert ^answer = DataTransmitter.set_property(@realm, @device_id, @interface, @path, @payload)
end

test "set property with opts" do
MockRPCClient
|> expect(:rpc_call, fn serialized_call, @vmq_plugin_destination ->
assert %Call{call: {:publish, %Publish{} = publish_call}} = Call.decode(serialized_call)
answer = {:ok, %{local_matches: 0, remote_matches: 0}}

Astarte.AppEngine.API.RPC.VMQPlugin.ClientMock
|> expect(:publish, fn data ->
encoded_payload = Cyanide.encode!(%{v: @payload, m: @metadata, t: @timestamp})

assert %Publish{
topic_tokens: [@realm, @encoded_device_id, @interface | @path_tokens],
payload: ^encoded_payload,
qos: 2
} = publish_call
%{
topic_tokens: [@realm, @encoded_device_id, @interface | @path_tokens],
payload: ^encoded_payload,
qos: 2
} = data

{:ok, @encoded_generic_ok_reply}
answer
end)

opts = [metadata: @metadata, timestamp: @timestamp]

assert :ok =
assert ^answer =
DataTransmitter.set_property(
@realm,
@device_id,
Expand All @@ -146,19 +142,19 @@ defmodule Astarte.AppEngine.APIWeb.DataTransmitterTest do
end

test "unset property" do
MockRPCClient
|> expect(:rpc_call, fn serialized_call, @vmq_plugin_destination ->
assert %Call{call: {:publish, %Publish{} = publish_call}} = Call.decode(serialized_call)
answer = {:ok, %{local_matches: 0, remote_matches: 0}}

assert %Publish{
topic_tokens: [@realm, @encoded_device_id, @interface | @path_tokens],
payload: <<>>,
qos: 2
} = publish_call
Astarte.AppEngine.API.RPC.VMQPlugin.ClientMock
|> expect(:publish, fn data ->
%{
topic_tokens: [@realm, @encoded_device_id, @interface | @path_tokens],
payload: <<>>,
qos: 2
} = data

{:ok, @encoded_generic_ok_reply}
answer
end)

assert :ok = DataTransmitter.unset_property(@realm, @device_id, @interface, @path)
assert ^answer = DataTransmitter.unset_property(@realm, @device_id, @interface, @path)
end
end
Loading
Loading