Skip to content

Commit 5cda927

Browse files
committed
refactor(AE): VMQPlugin RPC
Refactors the logic to handle RPC calls with astarte VerneMQ Plugin, using the new erlang clustering strategy. AE now calls an RPC server shared among all VerneMQ plugin replicas. Tests are reqorked too to mock RPC calls and answers from the server Signed-off-by: Luca Zaninotto <luca.zaninotto@secomind.com>
1 parent 354e409 commit 5cda927

File tree

9 files changed

+266
-351
lines changed

9 files changed

+266
-351
lines changed

apps/astarte_appengine_api/config/test.exs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ config :astarte_appengine_api,
3232
:data_updater_plant_rpc_client,
3333
Astarte.AppEngine.API.RPC.DataUpdaterPlant.ClientMock
3434

35+
config :astarte_appengine_api,
36+
:vernemq_plugin_rpc_client,
37+
Astarte.AppEngine.API.RPC.VMQPlugin.ClientMock
38+
3539
config :stream_data,
3640
max_runs: 50
3741

apps/astarte_appengine_api/lib/astarte_appengine_api/application.ex

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,9 @@ defmodule Astarte.AppEngine.API.Application do
5151
{Cluster.Supervisor,
5252
[Config.cluster_topologies!(), [name: Astarte.AppEngine.API.ClusterSupervisor]]},
5353
{Horde.Registry, [keys: :unique, name: Registry.DataUpdaterRPC, members: :auto]},
54+
{Horde.Registry, [keys: :unique, name: Registry.VMQPluginRPC, members: :auto]},
5455
Astarte.AppEngine.APIWeb.Telemetry,
5556
{Phoenix.PubSub, name: Astarte.AppEngine.API.PubSub},
56-
# TODO: still needed for VerneMQ RPC, remove once moved to erlang clustering
57-
Astarte.RPC.AMQP.Client,
5857
Astarte.AppEngine.API.Rooms.MasterSupervisor,
5958
Astarte.AppEngine.API.Rooms.AMQPClient,
6059
Astarte.AppEngine.APIWeb.Endpoint,

apps/astarte_appengine_api/lib/astarte_appengine_api/rpc/vmq_plugin.ex

Lines changed: 8 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -22,38 +22,22 @@ defmodule Astarte.AppEngine.API.RPC.VMQPlugin do
2222
"""
2323
require Logger
2424

25-
alias Astarte.RPC.Protocol.VMQ.Plugin, as: Protocol
26-
27-
alias Astarte.RPC.Protocol.VMQ.Plugin.{
28-
Call,
29-
GenericErrorReply,
30-
GenericOkReply,
31-
Publish,
32-
PublishReply,
33-
Reply
34-
}
35-
36-
alias Astarte.AppEngine.API.Config
37-
38-
require Logger
39-
40-
@rpc_client Config.rpc_client!()
41-
@destination Protocol.amqp_queue()
25+
@rpc_behaviour Application.compile_env(
26+
:astarte_appengine_api,
27+
:vernemq_plugin_rpc_client,
28+
Astarte.AppEngine.API.RPC.VMQPlugin.Client
29+
)
4230

4331
def publish(topic, payload, qos)
4432
when is_binary(topic) and is_binary(payload) and is_integer(qos) and qos >= 0 and qos <= 2 do
4533
with {:ok, tokens} <- split_topic(topic) do
46-
_ = Logger.debug("Going to publish value on MQTT.")
47-
48-
%Publish{
34+
data = %{
4935
topic_tokens: tokens,
5036
payload: payload,
5137
qos: qos
5238
}
53-
|> encode_call(:publish)
54-
|> @rpc_client.rpc_call(@destination)
55-
|> decode_reply()
56-
|> extract_reply()
39+
40+
@rpc_behaviour.publish(data)
5741
end
5842
end
5943

@@ -63,46 +47,4 @@ defmodule Astarte.AppEngine.API.RPC.VMQPlugin do
6347
tokens -> {:ok, tokens}
6448
end
6549
end
66-
67-
defp encode_call(call, callname) do
68-
%Call{call: {callname, call}}
69-
|> Call.encode()
70-
end
71-
72-
defp decode_reply({:ok, encoded_reply}) when is_binary(encoded_reply) do
73-
%Reply{reply: reply} = Reply.decode(encoded_reply)
74-
75-
_ = Logger.debug("Got reply from VWQ: #{inspect(reply)}.")
76-
77-
reply
78-
end
79-
80-
defp decode_reply({:error, reason}) do
81-
_ = Logger.warning("RPC error: #{inspect(reason)}.", tag: "rpc_remote_exception")
82-
{:error, reason}
83-
end
84-
85-
defp extract_reply({:publish_reply, %PublishReply{} = reply}) do
86-
_ = Logger.debug("Got publish reply from VMQ.")
87-
88-
{:ok, %{local_matches: reply.local_matches, remote_matches: reply.remote_matches}}
89-
end
90-
91-
defp extract_reply({:generic_ok_reply, %GenericOkReply{}}) do
92-
_ = Logger.debug("Got ok reply from VMQ.")
93-
94-
:ok
95-
end
96-
97-
defp extract_reply({:generic_error_reply, %GenericErrorReply{error_name: "session_not_found"}}) do
98-
{:error, :session_not_found}
99-
end
100-
101-
defp extract_reply({:generic_error_reply, error_struct = %GenericErrorReply{}}) do
102-
error_map = Map.from_struct(error_struct)
103-
104-
_ = Logger.error("Error while publishing value on MQTT.", tag: "vmq_publish_error")
105-
106-
{:error, error_map}
107-
end
10850
end
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
#
2+
# This file is part of Astarte.
3+
#
4+
# Copyright 2025 SECO Mind Srl
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#
18+
# SPDX-License-Identifier: Apache-2.0
19+
#
20+
21+
defmodule Astarte.AppEngine.API.RPC.VMQPlugin.Behaviour do
22+
@moduledoc false
23+
24+
@callback publish(%{topic_tokens: list(binary()), payload: binary(), qos: 0 | 1 | 2}) ::
25+
{:ok, %{local_matches: integer(), remote_matches: integer()}} | {:error, term()}
26+
end
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
#
2+
# This file is part of Astarte.
3+
#
4+
# Copyright 2025 SECO Mind Srl
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#
18+
# SPDX-License-Identifier: Apache-2.0
19+
#
20+
21+
defmodule Astarte.AppEngine.API.RPC.VMQPlugin.Client do
22+
@moduledoc false
23+
@behaviour Astarte.AppEngine.API.RPC.VMQPlugin.Behaviour
24+
25+
@impl Astarte.AppEngine.API.RPC.VMQPlugin.Behaviour
26+
def publish(data) do
27+
case Horde.Registry.lookup(Registry.VMQPluginRPC, :server) do
28+
[] -> {:error, :no_rpc_server}
29+
[{pid, _value}] -> GenServer.call(pid, {:publish, data})
30+
end
31+
end
32+
end

apps/astarte_appengine_api/test/astarte_appengine_api/data_transmitter_test.exs

Lines changed: 42 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -44,49 +44,45 @@ defmodule Astarte.AppEngine.APIWeb.DataTransmitterTest do
4444
@timestamp DateTime.utc_now()
4545
@metadata %{some: "metadata"}
4646

47-
@encoded_generic_ok_reply %Reply{
48-
reply: {:generic_ok_reply, %GenericOkReply{}}
49-
}
50-
|> Reply.encode()
51-
5247
test "datastream push with no opts" do
53-
MockRPCClient
54-
|> expect(:rpc_call, fn serialized_call, @vmq_plugin_destination ->
55-
assert %Call{call: {:publish, %Publish{} = publish_call}} = Call.decode(serialized_call)
48+
answer = {:ok, %{local_matches: 0, remote_matches: 0}}
5649

50+
Astarte.AppEngine.API.RPC.VMQPlugin.ClientMock
51+
|> expect(:publish, fn data ->
5752
encoded_payload = Cyanide.encode!(%{v: @payload})
5853

59-
assert %Publish{
54+
assert %{
6055
topic_tokens: [@realm, @encoded_device_id, @interface | @path_tokens],
6156
payload: ^encoded_payload,
6257
qos: 0
63-
} = publish_call
58+
} = data
6459

65-
{:ok, @encoded_generic_ok_reply}
60+
answer
6661
end)
6762

68-
assert :ok = DataTransmitter.push_datastream(@realm, @device_id, @interface, @path, @payload)
63+
assert ^answer =
64+
DataTransmitter.push_datastream(@realm, @device_id, @interface, @path, @payload)
6965
end
7066

7167
test "datastream push with opts" do
72-
MockRPCClient
73-
|> expect(:rpc_call, fn serialized_call, @vmq_plugin_destination ->
74-
assert %Call{call: {:publish, %Publish{} = publish_call}} = Call.decode(serialized_call)
68+
answer = {:ok, %{local_matches: 0, remote_matches: 0}}
7569

70+
Astarte.AppEngine.API.RPC.VMQPlugin.ClientMock
71+
|> expect(:publish, fn data ->
7672
encoded_payload = Cyanide.encode!(%{v: @payload, m: @metadata, t: @timestamp})
7773

78-
assert %Publish{
74+
assert %{
7975
topic_tokens: [@realm, @encoded_device_id, @interface | @path_tokens],
8076
payload: ^encoded_payload,
8177
qos: 1
82-
} = publish_call
78+
} = data
8379

84-
{:ok, @encoded_generic_ok_reply}
80+
answer
8581
end)
8682

8783
opts = [metadata: @metadata, timestamp: @timestamp, qos: 1]
8884

89-
assert :ok =
85+
assert ^answer =
9086
DataTransmitter.push_datastream(
9187
@realm,
9288
@device_id,
@@ -98,43 +94,43 @@ defmodule Astarte.AppEngine.APIWeb.DataTransmitterTest do
9894
end
9995

10096
test "set property with no opts" do
101-
MockRPCClient
102-
|> expect(:rpc_call, fn serialized_call, @vmq_plugin_destination ->
103-
assert %Call{call: {:publish, %Publish{} = publish_call}} = Call.decode(serialized_call)
97+
answer = {:ok, %{local_matches: 0, remote_matches: 0}}
10498

99+
Astarte.AppEngine.API.RPC.VMQPlugin.ClientMock
100+
|> expect(:publish, fn data ->
105101
encoded_payload = Cyanide.encode!(%{v: @payload})
106102

107-
assert %Publish{
103+
assert %{
108104
topic_tokens: [@realm, @encoded_device_id, @interface | @path_tokens],
109105
payload: ^encoded_payload,
110106
qos: 2
111-
} = publish_call
107+
} = data
112108

113-
{:ok, @encoded_generic_ok_reply}
109+
answer
114110
end)
115111

116-
assert :ok = DataTransmitter.set_property(@realm, @device_id, @interface, @path, @payload)
112+
assert ^answer = DataTransmitter.set_property(@realm, @device_id, @interface, @path, @payload)
117113
end
118114

119115
test "set property with opts" do
120-
MockRPCClient
121-
|> expect(:rpc_call, fn serialized_call, @vmq_plugin_destination ->
122-
assert %Call{call: {:publish, %Publish{} = publish_call}} = Call.decode(serialized_call)
116+
answer = {:ok, %{local_matches: 0, remote_matches: 0}}
123117

118+
Astarte.AppEngine.API.RPC.VMQPlugin.ClientMock
119+
|> expect(:publish, fn data ->
124120
encoded_payload = Cyanide.encode!(%{v: @payload, m: @metadata, t: @timestamp})
125121

126-
assert %Publish{
127-
topic_tokens: [@realm, @encoded_device_id, @interface | @path_tokens],
128-
payload: ^encoded_payload,
129-
qos: 2
130-
} = publish_call
122+
%{
123+
topic_tokens: [@realm, @encoded_device_id, @interface | @path_tokens],
124+
payload: ^encoded_payload,
125+
qos: 2
126+
} = data
131127

132-
{:ok, @encoded_generic_ok_reply}
128+
answer
133129
end)
134130

135131
opts = [metadata: @metadata, timestamp: @timestamp]
136132

137-
assert :ok =
133+
assert ^answer =
138134
DataTransmitter.set_property(
139135
@realm,
140136
@device_id,
@@ -146,19 +142,19 @@ defmodule Astarte.AppEngine.APIWeb.DataTransmitterTest do
146142
end
147143

148144
test "unset property" do
149-
MockRPCClient
150-
|> expect(:rpc_call, fn serialized_call, @vmq_plugin_destination ->
151-
assert %Call{call: {:publish, %Publish{} = publish_call}} = Call.decode(serialized_call)
145+
answer = {:ok, %{local_matches: 0, remote_matches: 0}}
152146

153-
assert %Publish{
154-
topic_tokens: [@realm, @encoded_device_id, @interface | @path_tokens],
155-
payload: <<>>,
156-
qos: 2
157-
} = publish_call
147+
Astarte.AppEngine.API.RPC.VMQPlugin.ClientMock
148+
|> expect(:publish, fn data ->
149+
%{
150+
topic_tokens: [@realm, @encoded_device_id, @interface | @path_tokens],
151+
payload: <<>>,
152+
qos: 2
153+
} = data
158154

159-
{:ok, @encoded_generic_ok_reply}
155+
answer
160156
end)
161157

162-
assert :ok = DataTransmitter.unset_property(@realm, @device_id, @interface, @path)
158+
assert ^answer = DataTransmitter.unset_property(@realm, @device_id, @interface, @path)
163159
end
164160
end

0 commit comments

Comments
 (0)