Skip to content

Commit 2874b6a

Browse files
authored
Merge pull request #1194 from lusergit/refactor/ae-verne-rpc
refactor(AE): VerneMQ RPC
2 parents 96a5ed7 + 5cda927 commit 2874b6a

File tree

9 files changed

+266
-351
lines changed

9 files changed

+266
-351
lines changed

apps/astarte_appengine_api/config/test.exs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ config :astarte_appengine_api,
3232
:data_updater_plant_rpc_client,
3333
Astarte.AppEngine.API.RPC.DataUpdaterPlant.ClientMock
3434

35+
config :astarte_appengine_api,
36+
:vernemq_plugin_rpc_client,
37+
Astarte.AppEngine.API.RPC.VMQPlugin.ClientMock
38+
3539
config :stream_data,
3640
max_runs: 50
3741

apps/astarte_appengine_api/lib/astarte_appengine_api/application.ex

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,9 @@ defmodule Astarte.AppEngine.API.Application do
5151
{Cluster.Supervisor,
5252
[Config.cluster_topologies!(), [name: Astarte.AppEngine.API.ClusterSupervisor]]},
5353
{Horde.Registry, [keys: :unique, name: Registry.DataUpdaterRPC, members: :auto]},
54+
{Horde.Registry, [keys: :unique, name: Registry.VMQPluginRPC, members: :auto]},
5455
Astarte.AppEngine.APIWeb.Telemetry,
5556
{Phoenix.PubSub, name: Astarte.AppEngine.API.PubSub},
56-
# TODO: still needed for VerneMQ RPC, remove once moved to erlang clustering
57-
Astarte.RPC.AMQP.Client,
5857
Astarte.AppEngine.API.Rooms.MasterSupervisor,
5958
Astarte.AppEngine.API.Rooms.AMQPClient,
6059
Astarte.AppEngine.APIWeb.Endpoint,

apps/astarte_appengine_api/lib/astarte_appengine_api/rpc/vmq_plugin.ex

Lines changed: 8 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -22,38 +22,22 @@ defmodule Astarte.AppEngine.API.RPC.VMQPlugin do
2222
"""
2323
require Logger
2424

25-
alias Astarte.RPC.Protocol.VMQ.Plugin, as: Protocol
26-
27-
alias Astarte.RPC.Protocol.VMQ.Plugin.{
28-
Call,
29-
GenericErrorReply,
30-
GenericOkReply,
31-
Publish,
32-
PublishReply,
33-
Reply
34-
}
35-
36-
alias Astarte.AppEngine.API.Config
37-
38-
require Logger
39-
40-
@rpc_client Config.rpc_client!()
41-
@destination Protocol.amqp_queue()
25+
@rpc_behaviour Application.compile_env(
26+
:astarte_appengine_api,
27+
:vernemq_plugin_rpc_client,
28+
Astarte.AppEngine.API.RPC.VMQPlugin.Client
29+
)
4230

4331
def publish(topic, payload, qos)
4432
when is_binary(topic) and is_binary(payload) and is_integer(qos) and qos >= 0 and qos <= 2 do
4533
with {:ok, tokens} <- split_topic(topic) do
46-
_ = Logger.debug("Going to publish value on MQTT.")
47-
48-
%Publish{
34+
data = %{
4935
topic_tokens: tokens,
5036
payload: payload,
5137
qos: qos
5238
}
53-
|> encode_call(:publish)
54-
|> @rpc_client.rpc_call(@destination)
55-
|> decode_reply()
56-
|> extract_reply()
39+
40+
@rpc_behaviour.publish(data)
5741
end
5842
end
5943

@@ -63,46 +47,4 @@ defmodule Astarte.AppEngine.API.RPC.VMQPlugin do
6347
tokens -> {:ok, tokens}
6448
end
6549
end
66-
67-
defp encode_call(call, callname) do
68-
%Call{call: {callname, call}}
69-
|> Call.encode()
70-
end
71-
72-
defp decode_reply({:ok, encoded_reply}) when is_binary(encoded_reply) do
73-
%Reply{reply: reply} = Reply.decode(encoded_reply)
74-
75-
_ = Logger.debug("Got reply from VWQ: #{inspect(reply)}.")
76-
77-
reply
78-
end
79-
80-
defp decode_reply({:error, reason}) do
81-
_ = Logger.warning("RPC error: #{inspect(reason)}.", tag: "rpc_remote_exception")
82-
{:error, reason}
83-
end
84-
85-
defp extract_reply({:publish_reply, %PublishReply{} = reply}) do
86-
_ = Logger.debug("Got publish reply from VMQ.")
87-
88-
{:ok, %{local_matches: reply.local_matches, remote_matches: reply.remote_matches}}
89-
end
90-
91-
defp extract_reply({:generic_ok_reply, %GenericOkReply{}}) do
92-
_ = Logger.debug("Got ok reply from VMQ.")
93-
94-
:ok
95-
end
96-
97-
defp extract_reply({:generic_error_reply, %GenericErrorReply{error_name: "session_not_found"}}) do
98-
{:error, :session_not_found}
99-
end
100-
101-
defp extract_reply({:generic_error_reply, error_struct = %GenericErrorReply{}}) do
102-
error_map = Map.from_struct(error_struct)
103-
104-
_ = Logger.error("Error while publishing value on MQTT.", tag: "vmq_publish_error")
105-
106-
{:error, error_map}
107-
end
10850
end
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
#
2+
# This file is part of Astarte.
3+
#
4+
# Copyright 2025 SECO Mind Srl
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#
18+
# SPDX-License-Identifier: Apache-2.0
19+
#
20+
21+
defmodule Astarte.AppEngine.API.RPC.VMQPlugin.Behaviour do
22+
@moduledoc false
23+
24+
@callback publish(%{topic_tokens: list(binary()), payload: binary(), qos: 0 | 1 | 2}) ::
25+
{:ok, %{local_matches: integer(), remote_matches: integer()}} | {:error, term()}
26+
end
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
#
2+
# This file is part of Astarte.
3+
#
4+
# Copyright 2025 SECO Mind Srl
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#
18+
# SPDX-License-Identifier: Apache-2.0
19+
#
20+
21+
defmodule Astarte.AppEngine.API.RPC.VMQPlugin.Client do
22+
@moduledoc false
23+
@behaviour Astarte.AppEngine.API.RPC.VMQPlugin.Behaviour
24+
25+
@impl Astarte.AppEngine.API.RPC.VMQPlugin.Behaviour
26+
def publish(data) do
27+
case Horde.Registry.lookup(Registry.VMQPluginRPC, :server) do
28+
[] -> {:error, :no_rpc_server}
29+
[{pid, _value}] -> GenServer.call(pid, {:publish, data})
30+
end
31+
end
32+
end

apps/astarte_appengine_api/test/astarte_appengine_api/data_transmitter_test.exs

Lines changed: 42 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -44,49 +44,45 @@ defmodule Astarte.AppEngine.APIWeb.DataTransmitterTest do
4444
@timestamp DateTime.utc_now()
4545
@metadata %{some: "metadata"}
4646

47-
@encoded_generic_ok_reply %Reply{
48-
reply: {:generic_ok_reply, %GenericOkReply{}}
49-
}
50-
|> Reply.encode()
51-
5247
test "datastream push with no opts" do
53-
MockRPCClient
54-
|> expect(:rpc_call, fn serialized_call, @vmq_plugin_destination ->
55-
assert %Call{call: {:publish, %Publish{} = publish_call}} = Call.decode(serialized_call)
48+
answer = {:ok, %{local_matches: 0, remote_matches: 0}}
5649

50+
Astarte.AppEngine.API.RPC.VMQPlugin.ClientMock
51+
|> expect(:publish, fn data ->
5752
encoded_payload = Cyanide.encode!(%{v: @payload})
5853

59-
assert %Publish{
54+
assert %{
6055
topic_tokens: [@realm, @encoded_device_id, @interface | @path_tokens],
6156
payload: ^encoded_payload,
6257
qos: 0
63-
} = publish_call
58+
} = data
6459

65-
{:ok, @encoded_generic_ok_reply}
60+
answer
6661
end)
6762

68-
assert :ok = DataTransmitter.push_datastream(@realm, @device_id, @interface, @path, @payload)
63+
assert ^answer =
64+
DataTransmitter.push_datastream(@realm, @device_id, @interface, @path, @payload)
6965
end
7066

7167
test "datastream push with opts" do
72-
MockRPCClient
73-
|> expect(:rpc_call, fn serialized_call, @vmq_plugin_destination ->
74-
assert %Call{call: {:publish, %Publish{} = publish_call}} = Call.decode(serialized_call)
68+
answer = {:ok, %{local_matches: 0, remote_matches: 0}}
7569

70+
Astarte.AppEngine.API.RPC.VMQPlugin.ClientMock
71+
|> expect(:publish, fn data ->
7672
encoded_payload = Cyanide.encode!(%{v: @payload, m: @metadata, t: @timestamp})
7773

78-
assert %Publish{
74+
assert %{
7975
topic_tokens: [@realm, @encoded_device_id, @interface | @path_tokens],
8076
payload: ^encoded_payload,
8177
qos: 1
82-
} = publish_call
78+
} = data
8379

84-
{:ok, @encoded_generic_ok_reply}
80+
answer
8581
end)
8682

8783
opts = [metadata: @metadata, timestamp: @timestamp, qos: 1]
8884

89-
assert :ok =
85+
assert ^answer =
9086
DataTransmitter.push_datastream(
9187
@realm,
9288
@device_id,
@@ -98,43 +94,43 @@ defmodule Astarte.AppEngine.APIWeb.DataTransmitterTest do
9894
end
9995

10096
test "set property with no opts" do
101-
MockRPCClient
102-
|> expect(:rpc_call, fn serialized_call, @vmq_plugin_destination ->
103-
assert %Call{call: {:publish, %Publish{} = publish_call}} = Call.decode(serialized_call)
97+
answer = {:ok, %{local_matches: 0, remote_matches: 0}}
10498

99+
Astarte.AppEngine.API.RPC.VMQPlugin.ClientMock
100+
|> expect(:publish, fn data ->
105101
encoded_payload = Cyanide.encode!(%{v: @payload})
106102

107-
assert %Publish{
103+
assert %{
108104
topic_tokens: [@realm, @encoded_device_id, @interface | @path_tokens],
109105
payload: ^encoded_payload,
110106
qos: 2
111-
} = publish_call
107+
} = data
112108

113-
{:ok, @encoded_generic_ok_reply}
109+
answer
114110
end)
115111

116-
assert :ok = DataTransmitter.set_property(@realm, @device_id, @interface, @path, @payload)
112+
assert ^answer = DataTransmitter.set_property(@realm, @device_id, @interface, @path, @payload)
117113
end
118114

119115
test "set property with opts" do
120-
MockRPCClient
121-
|> expect(:rpc_call, fn serialized_call, @vmq_plugin_destination ->
122-
assert %Call{call: {:publish, %Publish{} = publish_call}} = Call.decode(serialized_call)
116+
answer = {:ok, %{local_matches: 0, remote_matches: 0}}
123117

118+
Astarte.AppEngine.API.RPC.VMQPlugin.ClientMock
119+
|> expect(:publish, fn data ->
124120
encoded_payload = Cyanide.encode!(%{v: @payload, m: @metadata, t: @timestamp})
125121

126-
assert %Publish{
127-
topic_tokens: [@realm, @encoded_device_id, @interface | @path_tokens],
128-
payload: ^encoded_payload,
129-
qos: 2
130-
} = publish_call
122+
%{
123+
topic_tokens: [@realm, @encoded_device_id, @interface | @path_tokens],
124+
payload: ^encoded_payload,
125+
qos: 2
126+
} = data
131127

132-
{:ok, @encoded_generic_ok_reply}
128+
answer
133129
end)
134130

135131
opts = [metadata: @metadata, timestamp: @timestamp]
136132

137-
assert :ok =
133+
assert ^answer =
138134
DataTransmitter.set_property(
139135
@realm,
140136
@device_id,
@@ -146,19 +142,19 @@ defmodule Astarte.AppEngine.APIWeb.DataTransmitterTest do
146142
end
147143

148144
test "unset property" do
149-
MockRPCClient
150-
|> expect(:rpc_call, fn serialized_call, @vmq_plugin_destination ->
151-
assert %Call{call: {:publish, %Publish{} = publish_call}} = Call.decode(serialized_call)
145+
answer = {:ok, %{local_matches: 0, remote_matches: 0}}
152146

153-
assert %Publish{
154-
topic_tokens: [@realm, @encoded_device_id, @interface | @path_tokens],
155-
payload: <<>>,
156-
qos: 2
157-
} = publish_call
147+
Astarte.AppEngine.API.RPC.VMQPlugin.ClientMock
148+
|> expect(:publish, fn data ->
149+
%{
150+
topic_tokens: [@realm, @encoded_device_id, @interface | @path_tokens],
151+
payload: <<>>,
152+
qos: 2
153+
} = data
158154

159-
{:ok, @encoded_generic_ok_reply}
155+
answer
160156
end)
161157

162-
assert :ok = DataTransmitter.unset_property(@realm, @device_id, @interface, @path)
158+
assert ^answer = DataTransmitter.unset_property(@realm, @device_id, @interface, @path)
163159
end
164160
end

0 commit comments

Comments
 (0)