Skip to content

Commit 31f2448

Browse files
authored
Merge pull request #1186 from lusergit/feat/ae-dup-rpc
refactor: move AppEngine <-> DUP RPC to elixir GenServer calls
2 parents c838715 + a908a48 commit 31f2448

20 files changed

Lines changed: 477 additions & 406 deletions

File tree

apps/astarte_appengine_api/config/test.exs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ config :astarte_rpc, :amqp_connection, host: System.get_env("RABBITMQ_HOST") ||
2828

2929
config :astarte_appengine_api, :rpc_client, MockRPCClient
3030

31+
config :astarte_appengine_api,
32+
:data_updater_plant_rpc_client,
33+
Astarte.AppEngine.API.RPC.DataUpdaterPlant.ClientMock
34+
3135
config :stream_data,
3236
max_runs: 50
3337

apps/astarte_appengine_api/lib/astarte_appengine_api/application.ex

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,10 @@ defmodule Astarte.AppEngine.API.Application do
5050
children = [
5151
{Cluster.Supervisor,
5252
[Config.cluster_topologies!(), [name: Astarte.AppEngine.API.ClusterSupervisor]]},
53+
{Horde.Registry, [keys: :unique, name: Registry.DataUpdaterRPC, members: :auto]},
5354
Astarte.AppEngine.APIWeb.Telemetry,
5455
{Phoenix.PubSub, name: Astarte.AppEngine.API.PubSub},
56+
# TODO: still needed for VerneMQ RPC, remove once moved to erlang clustering
5557
Astarte.RPC.AMQP.Client,
5658
Astarte.AppEngine.API.Rooms.MasterSupervisor,
5759
Astarte.AppEngine.API.Rooms.AMQPClient,

apps/astarte_appengine_api/lib/astarte_appengine_api/rpc/data_updater_plant.ex

Lines changed: 25 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -23,22 +23,13 @@ defmodule Astarte.AppEngine.API.RPC.DataUpdaterPlant do
2323

2424
require Logger
2525

26-
alias Astarte.RPC.Protocol.DataUpdaterPlant, as: Protocol
27-
28-
alias Astarte.RPC.Protocol.DataUpdaterPlant.{
29-
Call,
30-
DeleteVolatileTrigger,
31-
GenericErrorReply,
32-
GenericOkReply,
33-
InstallVolatileTrigger,
34-
Reply
35-
}
36-
37-
alias Astarte.AppEngine.API.Config
3826
alias Astarte.AppEngine.API.RPC.DataUpdaterPlant.VolatileTrigger
3927

40-
@rpc_client Config.rpc_client!()
41-
@destination Protocol.amqp_queue()
28+
@rpc_behaviour Application.compile_env(
29+
:astarte_appengine_api,
30+
:data_updater_plant_rpc_client,
31+
Astarte.AppEngine.API.RPC.DataUpdaterPlant.Client
32+
)
4233

4334
def install_volatile_trigger(realm_name, device_id, %VolatileTrigger{} = volatile_trigger) do
4435
%VolatileTrigger{
@@ -50,56 +41,29 @@ defmodule Astarte.AppEngine.API.RPC.DataUpdaterPlant do
5041
serialized_trigger_target: serialized_trigger_target
5142
} = volatile_trigger
5243

53-
%InstallVolatileTrigger{
54-
realm_name: realm_name,
55-
device_id: device_id,
56-
object_id: object_id,
57-
object_type: object_type,
58-
parent_id: parent_id,
59-
simple_trigger: serialized_simple_trigger,
60-
simple_trigger_id: simple_trigger_id,
61-
trigger_target: serialized_trigger_target
62-
}
63-
|> encode_call(:install_volatile_trigger)
64-
|> @rpc_client.rpc_call(@destination)
65-
|> decode_reply()
66-
|> extract_reply()
67-
end
44+
volatile_trigger =
45+
%{
46+
realm_name: realm_name,
47+
device_id: device_id,
48+
object_id: object_id,
49+
object_type: object_type,
50+
parent_id: parent_id,
51+
simple_trigger: serialized_simple_trigger,
52+
simple_trigger_id: simple_trigger_id,
53+
trigger_target: serialized_trigger_target
54+
}
6855

69-
def delete_volatile_trigger(realm_name, device_id, trigger_id) do
70-
%DeleteVolatileTrigger{
71-
realm_name: realm_name,
72-
device_id: device_id,
73-
trigger_id: trigger_id
74-
}
75-
|> encode_call(:delete_volatile_trigger)
76-
|> @rpc_client.rpc_call(@destination)
77-
|> decode_reply()
78-
|> extract_reply()
56+
@rpc_behaviour.install_volatile_trigger(volatile_trigger)
7957
end
8058

81-
defp encode_call(call, callname) do
82-
%Call{call: {callname, call}}
83-
|> Call.encode()
84-
end
85-
86-
defp decode_reply({:ok, encoded_reply}) when is_binary(encoded_reply) do
87-
%Reply{reply: reply} = Reply.decode(encoded_reply)
88-
reply
89-
end
90-
91-
defp decode_reply({:error, reason}) do
92-
_ = Logger.warning("RPC error: #{inspect(reason)}.", tag: "rpc_remote_exception")
93-
{:error, reason}
94-
end
95-
96-
defp extract_reply({:generic_ok_reply, %GenericOkReply{}}) do
97-
:ok
98-
end
99-
100-
defp extract_reply({:generic_error_reply, error_struct = %GenericErrorReply{}}) do
101-
error_map = Map.from_struct(error_struct)
59+
def delete_volatile_trigger(realm_name, device_id, trigger_id) do
60+
delete_trigger =
61+
%{
62+
realm_name: realm_name,
63+
device_id: device_id,
64+
trigger_id: trigger_id
65+
}
10266

103-
{:error, error_map}
67+
@rpc_behaviour.delete_volatile_trigger(delete_trigger)
10468
end
10569
end
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
#
2+
# This file is part of Astarte.
3+
#
4+
# Copyright 2025 SECO Mind Srl
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#
18+
# SPDX-License-Identifier: Apache-2.0
19+
#
20+
21+
defmodule Astarte.AppEngine.API.RPC.DataUpdaterPlant.Behaviour do
22+
@moduledoc false
23+
24+
alias Astarte.AppEngine.API.RPC.DataUpdaterPlant.InstallVolatileTrigger
25+
alias Astarte.AppEngine.API.RPC.DataUpdaterPlant.DeleteVolatileTrigger
26+
27+
@callback install_volatile_trigger(request_data :: InstallVolatileTrigger.RequestData.t()) ::
28+
:ok | {:error, term()}
29+
30+
@callback delete_volatile_trigger(request_data :: DeleteVolatileTrigger.RequestData.t()) ::
31+
:ok | {:error, term()}
32+
end
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
#
2+
# This file is part of Astarte.
3+
#
4+
# Copyright 2025 SECO Mind Srl
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#
18+
# SPDX-License-Identifier: Apache-2.0
19+
#
20+
21+
defmodule Astarte.AppEngine.API.RPC.DataUpdaterPlant.Client do
22+
@moduledoc false
23+
@behaviour Astarte.AppEngine.API.RPC.DataUpdaterPlant.Behaviour
24+
25+
@impl Astarte.AppEngine.API.RPC.DataUpdaterPlant.Behaviour
26+
def install_volatile_trigger(request_data) do
27+
server_via_tuple()
28+
|> GenServer.call({:install_volatile_trigger, request_data})
29+
end
30+
31+
@impl Astarte.AppEngine.API.RPC.DataUpdaterPlant.Behaviour
32+
def delete_volatile_trigger(request_data) do
33+
server_via_tuple()
34+
|> GenServer.call({:delete_volatile_trigger, request_data})
35+
end
36+
37+
defp server_via_tuple(), do: {:via, Horde.Registry, {Registry.DataUpdaterRPC, :server}}
38+
end
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
#
2+
# This file is part of Astarte.
3+
#
4+
# Copyright 2025 SECO Mind Srl
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#
18+
# SPDX-License-Identifier: Apache-2.0
19+
#
20+
21+
defmodule Astarte.AppEngine.API.RPC.DataUpdaterPlant.DeleteVolatileTrigger.RequestData do
22+
@moduledoc false
23+
24+
defstruct [
25+
:realm_name,
26+
:device_id,
27+
:trigger_id
28+
]
29+
30+
# TODO: actually type things
31+
@type t() :: %__MODULE__{
32+
realm_name: String.t(),
33+
device_id: binary(),
34+
trigger_id: binary()
35+
}
36+
end
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
#
2+
# This file is part of Astarte.
3+
#
4+
# Copyright 2025 SECO Mind Srl
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#
18+
# SPDX-License-Identifier: Apache-2.0
19+
#
20+
21+
defmodule Astarte.AppEngine.API.RPC.DataUpdaterPlant.InstallVolatileTrigger.RequestData do
22+
@moduledoc false
23+
24+
defstruct [
25+
:realm_name,
26+
:device_id,
27+
:object_id,
28+
:object_type,
29+
:parent_id,
30+
:simple_trigger,
31+
:simple_trigger_id,
32+
:trigger_target
33+
]
34+
35+
# TODO: actually type things
36+
@type t() :: %__MODULE__{
37+
realm_name: String.t(),
38+
device_id: binary(),
39+
object_id: binary(),
40+
object_type: integer(),
41+
parent_id: term(),
42+
simple_trigger: term(),
43+
simple_trigger_id: term(),
44+
trigger_target: term()
45+
}
46+
end

apps/astarte_appengine_api/lib/astarte_appengine_api_web/plug/guardian_authorize_path.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,8 @@ defmodule Astarte.AppEngine.APIWeb.Plug.GuardianAuthorizePath do
7070
{:ok, {method_regex, path_regex}} ->
7171
_ =
7272
Logger.debug(
73-
"Checking #{method} against #{inspect(method_regex)} and " <>
74-
"#{auth_path} against #{inspect(path_regex)}."
73+
"Checking #{inspect(method)} against #{inspect(method_regex)} and " <>
74+
"#{inspect(auth_path)} against #{inspect(path_regex)}."
7575
)
7676

7777
Regex.match?(method_regex, method) and Regex.match?(path_regex, auth_path)

apps/astarte_appengine_api/mix.exs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ defmodule Astarte.AppEngine.API.Mixfile do
111111
{:observer_cli, "~> 1.5"},
112112
{:dialyxir, "~> 1.0", only: [:dev, :ci], runtime: false},
113113
{:libcluster, "~> 3.3"},
114+
{:horde, "~> 0.9"},
114115
# Workaround for Elixir 1.15 / ssl_verify_fun issue
115116
# See also: https://github.com/deadtrickster/ssl_verify_fun.erl/pull/27
116117
{:ssl_verify_fun, "~> 1.1.0", manager: :rebar3, override: true},

apps/astarte_appengine_api/mix.lock

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
"cyanide": {:hex, :cyanide, "2.0.0", "f97b700b87f9b0679ae812f0c4b7fe35ea6541a4121a096cf10287941b7a6d55", [:mix], [], "hexpm", "7f9748251804c2a2115b539202568e1117ab2f0ae09875853fb89cc94ae19dd1"},
1616
"db_connection": {:hex, :db_connection, "2.7.0", "b99faa9291bb09892c7da373bb82cba59aefa9b36300f6145c5f201c7adf48ec", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "dcf08f31b2701f857dfc787fbad78223d61a32204f217f15e881dd93e4bdd3ff"},
1717
"decimal": {:hex, :decimal, "2.3.0", "3ad6255aa77b4a3c4f818171b12d237500e63525c2fd056699967a3e7ea20f62", [:mix], [], "hexpm", "a4d66355cb29cb47c3cf30e71329e58361cfcb37c34235ef3bf1d7bf3773aeac"},
18+
"delta_crdt": {:hex, :delta_crdt, "0.6.5", "c7bb8c2c7e60f59e46557ab4e0224f67ba22f04c02826e273738f3dcc4767adc", [:mix], [{:merkle_map, "~> 0.2.0", [hex: :merkle_map, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "c6ae23a525d30f96494186dd11bf19ed9ae21d9fe2c1f1b217d492a7cc7294ae"},
1819
"dialyxir": {:hex, :dialyxir, "1.4.5", "ca1571ac18e0f88d4ab245f0b60fa31ff1b12cbae2b11bd25d207f865e8ae78a", [:mix], [{:erlex, ">= 0.2.7", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "b0fb08bb8107c750db5c0b324fa2df5ceaa0f9307690ee3c1f6ba5b9eb5d35c3"},
1920
"ecto": {:hex, :ecto, "3.12.5", "4a312960ce612e17337e7cefcf9be45b95a3be6b36b6f94dfb3d8c361d631866", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "6eb18e80bef8bb57e17f5a7f068a1719fbda384d40fc37acb8eb8aeca493b6ea"},
2021
"ecto_morph": {:hex, :ecto_morph, "0.1.29", "bc0b915779636bd2d30c54cad6922b3cb40f85b1d4ad59bdffd3c788d9d1f972", [:mix], [{:ecto, ">= 3.0.3", [hex: :ecto, repo: "hexpm", optional: false]}], "hexpm", "814bed72e3d03b278c1dfb3fbc4da37f478a37518ee54f010c1ad9254f1ca0e3"},
@@ -28,11 +29,14 @@
2829
"expo": {:hex, :expo, "1.1.0", "f7b9ed7fb5745ebe1eeedf3d6f29226c5dd52897ac67c0f8af62a07e661e5c75", [:mix], [], "hexpm", "fbadf93f4700fb44c331362177bdca9eeb8097e8b0ef525c9cc501cb9917c960"},
2930
"gettext": {:hex, :gettext, "0.26.2", "5978aa7b21fada6deabf1f6341ddba50bc69c999e812211903b169799208f2a8", [:mix], [{:expo, "~> 0.5.1 or ~> 1.0", [hex: :expo, repo: "hexpm", optional: false]}], "hexpm", "aa978504bcf76511efdc22d580ba08e2279caab1066b76bb9aa81c4a1e0a32a5"},
3031
"guardian": {:hex, :guardian, "2.3.2", "78003504b987f2b189d76ccf9496ceaa6a454bb2763627702233f31eb7212881", [:mix], [{:jose, "~> 1.8", [hex: :jose, repo: "hexpm", optional: false]}, {:plug, "~> 1.3.3 or ~> 1.4", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "b189ff38cd46a22a8a824866a6867ca8722942347f13c33f7d23126af8821b52"},
32+
"horde": {:hex, :horde, "0.9.0", "522342bd7149aeed453c97692a8bca9cf7c9368c5a489afd802e575dc8df54a6", [:mix], [{:delta_crdt, "~> 0.6.2", [hex: :delta_crdt, repo: "hexpm", optional: false]}, {:libring, "~> 1.4", [hex: :libring, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_poller, "~> 0.5.0 or ~> 1.0", [hex: :telemetry_poller, repo: "hexpm", optional: false]}], "hexpm", "fae11e5bc9c980038607d0c3338cdf7f97124a5d5382fd4b6fb6beaab8e214fe"},
3133
"jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"},
3234
"jose": {:hex, :jose, "1.11.10", "a903f5227417bd2a08c8a00a0cbcc458118be84480955e8d251297a425723f83", [:mix, :rebar3], [], "hexpm", "0d6cd36ff8ba174db29148fc112b5842186b68a90ce9fc2b3ec3afe76593e614"},
3335
"libcluster": {:hex, :libcluster, "3.3.3", "a4f17721a19004cfc4467268e17cff8b1f951befe428975dd4f6f7b84d927fe0", [:mix], [{:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "7c0a2275a0bb83c07acd17dab3c3bfb4897b145106750eeccc62d302e3bdfee5"},
36+
"libring": {:hex, :libring, "1.7.0", "4f245d2f1476cd7ed8f03740f6431acba815401e40299208c7f5c640e1883bda", [:mix], [], "hexpm", "070e3593cb572e04f2c8470dd0c119bc1817a7a0a7f88229f43cf0345268ec42"},
3437
"logfmt": {:hex, :logfmt, "3.3.3", "6521ee4a5c532088e15d487fab9f736c07bdd161d643560c73cd4b10685deb65", [:mix], [], "hexpm", "dbd51cd3fe37c3429b9bd687bad1f531a533505f4a641592129e7a47e24104d1"},
3538
"lz4": {:hex, :lz4_erl, "0.2.4", "fafc1fa39ed1d034893316852daebedd82f37df478446224ac096490be3b4a4d", [:rebar3], [], "hexpm", "e3eb9e2b5c1e4dab39db8fe0421e6fa10f9bf5843f20dab43518f8ab8e812517"},
39+
"merkle_map": {:hex, :merkle_map, "0.2.1", "01a88c87a6b9fb594c67c17ebaf047ee55ffa34e74297aa583ed87148006c4c8", [:mix], [], "hexpm", "fed4d143a5c8166eee4fa2b49564f3c4eace9cb252f0a82c1613bba905b2d04d"},
3640
"mime": {:hex, :mime, "2.0.6", "8f18486773d9b15f95f4f4f1e39b710045fa1de891fada4516559967276e4dc2", [:mix], [], "hexpm", "c9945363a6b26d747389aac3643f8e0e09d30499a138ad64fe8fd1d13d9b153e"},
3741
"mox": {:hex, :mox, "0.5.2", "55a0a5ba9ccc671518d068c8dddd20eeb436909ea79d1799e2209df7eaa98b6c", [:mix], [], "hexpm", "df4310628cd628ee181df93f50ddfd07be3e5ecc30232d3b6aadf30bdfe6092b"},
3842
"nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"},

0 commit comments

Comments
 (0)