Skip to content

Commit 102ceb4

Browse files
committed
refactor(dup): use astarte_events trigger cache for incoming_data (#1563)
use the cache from astarte_events for incoming_data as a proof of concept for data triggers Signed-off-by: Francesco Noacco <francesco.noacco@secomind.com>
1 parent f07bdbc commit 102ceb4

File tree

7 files changed

+128
-136
lines changed

7 files changed

+128
-136
lines changed

apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/data_updater/core/data_handler.ex

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Core.DataHandler do
3030
alias Astarte.Core.Device
3131
alias Astarte.DataUpdaterPlant.DataUpdater.PayloadsDecoder
3232
alias Astarte.DataUpdaterPlant.DataUpdater.Core
33+
alias Astarte.DataUpdaterPlant.TriggersHandler
3334

3435
require Logger
3536

@@ -305,16 +306,18 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Core.DataHandler do
305306
do: value_timestamp,
306307
else: div(context.timestamp, 10000)
307308

308-
Core.DataTrigger.execute_incoming_data_triggers(
309-
state,
310-
Device.encode_device_id(state.device_id),
309+
TriggersHandler.incoming_data(
310+
state.realm,
311+
state.device_id,
312+
state.groups,
311313
interface_descriptor.name,
312314
interface_id,
313-
path,
314315
mapping.endpoint_id,
315-
payload,
316+
path,
316317
value,
317-
maybe_explicit_value_timestamp
318+
payload,
319+
maybe_explicit_value_timestamp,
320+
state
318321
)
319322
end
320323

apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/data_updater/core/data_trigger.ex

Lines changed: 10 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Core.DataTrigger do
2626
alias Astarte.Core.Mapping.EndpointsAutomaton
2727
alias Astarte.Core.InterfaceDescriptor
2828
alias Astarte.Core.Triggers.DataTrigger
29-
alias Astarte.DataUpdaterPlant.DataUpdater.Core
3029

3130
def data_trigger_to_key(state, data_trigger, event_type) do
3231
%DataTrigger{
@@ -65,75 +64,23 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Core.DataTrigger do
6564
value,
6665
timestamp
6766
) do
68-
realm = state.realm
67+
%{realm: realm, groups: groups} = state
6968

70-
# any interface triggers
71-
Core.Interface.get_on_data_triggers(state, :on_incoming_data, :any_interface, :any_endpoint)
72-
|> Enum.each(fn trigger ->
73-
target_with_policy_list = get_target_with_policy_list(state, trigger)
74-
75-
TriggersHandler.incoming_data(
76-
target_with_policy_list,
77-
realm,
78-
device,
79-
interface,
80-
path,
81-
payload,
82-
timestamp
83-
)
84-
end)
85-
86-
# any endpoint triggers
87-
Core.Interface.get_on_data_triggers(state, :on_incoming_data, interface_id, :any_endpoint)
88-
|> Enum.each(fn trigger ->
89-
target_with_policy_list = get_target_with_policy_list(state, trigger)
90-
91-
TriggersHandler.incoming_data(
92-
target_with_policy_list,
93-
realm,
94-
device,
95-
interface,
96-
path,
97-
payload,
98-
timestamp
99-
)
100-
end)
101-
102-
# incoming data triggers
103-
Core.Interface.get_on_data_triggers(
104-
state,
105-
:on_incoming_data,
69+
TriggersHandler.incoming_data(
70+
realm,
71+
device,
72+
groups,
73+
interface,
10674
interface_id,
10775
endpoint_id,
10876
path,
109-
value
77+
value,
78+
payload,
79+
timestamp,
80+
state
11081
)
111-
|> Enum.each(fn trigger ->
112-
target_with_policy_list = get_target_with_policy_list(state, trigger)
113-
114-
TriggersHandler.incoming_data(
115-
target_with_policy_list,
116-
realm,
117-
device,
118-
interface,
119-
path,
120-
payload,
121-
timestamp
122-
)
123-
end)
124-
125-
:ok
12682
end
12783

12884
defp replace_empty_token(""), do: "%{}"
12985
defp replace_empty_token(non_empty), do: non_empty
130-
131-
defp get_target_with_policy_list(state, trigger) do
132-
trigger.trigger_targets
133-
|> Enum.map(fn target ->
134-
parent_target_id = Map.get(state.trigger_id_to_policy_name, target.parent_trigger_id)
135-
136-
{target, parent_target_id}
137-
end)
138-
end
13986
end

apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/triggers_handler.ex

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -112,33 +112,44 @@ defmodule Astarte.DataUpdaterPlant.TriggersHandler do
112112
)
113113
end
114114

115-
def incoming_data(targets, realm, device_id, interface, path, bson_value, timestamp)
116-
when is_list(targets) do
117-
execute_all_ok(targets, fn {target, policy} ->
118-
incoming_data(target, realm, device_id, interface, path, bson_value, timestamp, policy) ==
119-
:ok
120-
end)
121-
end
122-
123115
def incoming_data(
124-
target,
125116
realm,
126117
device_id,
127-
interface,
118+
groups,
119+
interface_name,
120+
interface_id,
121+
endpoint_id,
128122
path,
123+
value,
129124
bson_value,
130125
timestamp,
131-
policy
126+
state
132127
) do
133-
%IncomingDataEvent{interface: interface, path: path, bson_value: bson_value}
134-
|> dispatch_event_with_telemetry(
135-
:incoming_data_event,
136-
target,
128+
event = %IncomingDataEvent{interface: interface_name, path: path, bson_value: bson_value}
129+
hw_id = Device.encode_device_id(device_id)
130+
131+
Triggers.find_all_data_trigger_targets(
137132
realm,
138133
device_id,
139-
timestamp,
140-
policy
134+
groups,
135+
:on_incoming_data,
136+
interface_id,
137+
endpoint_id,
138+
path,
139+
value,
140+
Map.from_struct(state)
141141
)
142+
|> execute_all_ok(fn {target, policy} ->
143+
dispatch_event_with_telemetry(
144+
event,
145+
:incoming_data_event,
146+
target,
147+
realm,
148+
hw_id,
149+
timestamp,
150+
policy
151+
)
152+
end)
142153
end
143154

144155
def incoming_introspection(targets, realm, device_id, introspection, timestamp)

apps/astarte_data_updater_plant/test/astarte_data_updater_plant/data_updater/core/data_trigger_test.exs

Lines changed: 58 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Core.DataTriggerTest do
2323
use ExUnitProperties
2424

2525
alias Astarte.DataUpdaterPlant.DataUpdater
26-
alias Astarte.DataUpdaterPlant.DataUpdater.Core
2726
alias Astarte.Core.CQLUtils
2827
alias Astarte.Core.Triggers.SimpleTriggersProtobuf.AMQPTriggerTarget
2928
alias Astarte.Core.Triggers.SimpleTriggersProtobuf.DataTrigger
@@ -93,19 +92,21 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Core.DataTriggerTest do
9392
state: state
9493
} = context
9594

96-
Mimic.reject(&TriggersHandler.incoming_data/7)
95+
Mimic.reject(&Astarte.Events.TriggersHandler.dispatch_event/7)
9796

9897
assert :ok ==
99-
Core.DataTrigger.execute_incoming_data_triggers(
100-
state,
101-
"encoded_device_id",
98+
TriggersHandler.incoming_data(
99+
state.realm,
100+
state.device_id,
101+
state.groups,
102102
"test.interface",
103-
1,
103+
_interface_id = <<>>,
104+
_endpoint_id = <<>>,
104105
"/test/path",
105-
1,
106-
<<0, 1, 2, 3>>,
107106
42,
108-
1_600_000_000
107+
<<0, 1, 2, 3>>,
108+
1_600_000_000,
109+
state
109110
)
110111
end
111112

@@ -146,16 +147,18 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Core.DataTriggerTest do
146147
end)
147148

148149
assert :ok ==
149-
Core.DataTrigger.execute_incoming_data_triggers(
150-
state,
151-
device.encoded_id,
150+
TriggersHandler.incoming_data(
151+
realm,
152+
state.device_id,
153+
state.groups,
152154
interface_name,
153155
interface_id,
154-
path,
155156
endpoint_id,
156-
payload,
157+
path,
157158
value,
158-
timestamp
159+
payload,
160+
timestamp,
161+
state
159162
)
160163
end
161164

@@ -197,16 +200,18 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Core.DataTriggerTest do
197200
end)
198201

199202
assert :ok ==
200-
Core.DataTrigger.execute_incoming_data_triggers(
201-
state,
202-
device.encoded_id,
203+
TriggersHandler.incoming_data(
204+
state.realm,
205+
state.device_id,
206+
state.groups,
203207
interface_name,
204208
interface_id,
205-
path,
206209
endpoint_id,
207-
payload,
210+
path,
208211
value,
209-
timestamp
212+
payload,
213+
timestamp,
214+
state
210215
)
211216
end
212217

@@ -237,27 +242,29 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Core.DataTriggerTest do
237242
known_value: Cyanide.encode!(%{v: 50})
238243
})
239244

240-
Mimic.expect(TriggersHandler, :incoming_data, fn _target_with_policy_list,
241-
^realm,
242-
^device_id,
243-
^interface_name,
244-
^path,
245-
^payload,
246-
^timestamp ->
245+
Mimic.expect(Astarte.Events.TriggersHandler, :dispatch_event, fn _event,
246+
_event_type,
247+
_target,
248+
^realm,
249+
^device_id,
250+
^timestamp,
251+
_policy ->
247252
:ok
248253
end)
249254

250255
assert :ok ==
251-
Core.DataTrigger.execute_incoming_data_triggers(
252-
state,
253-
device.encoded_id,
256+
TriggersHandler.incoming_data(
257+
state.realm,
258+
device.device_id,
259+
[],
254260
interface_name,
255261
interface_id,
256-
path,
257262
endpoint_id,
258-
payload,
263+
path,
259264
value,
260-
timestamp
265+
payload,
266+
timestamp,
267+
state
261268
)
262269
end
263270

@@ -286,19 +293,21 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Core.DataTriggerTest do
286293
known_value: Cyanide.encode!(%{v: 50})
287294
})
288295

289-
Mimic.reject(&TriggersHandler.incoming_data/7)
296+
Mimic.reject(&Astarte.Events.TriggersHandler.dispatch_event/7)
290297

291298
assert :ok ==
292-
Core.DataTrigger.execute_incoming_data_triggers(
293-
state,
294-
device.encoded_id,
299+
TriggersHandler.incoming_data(
300+
state.realm,
301+
device.device_id,
302+
state.groups,
295303
interface_name,
296304
interface_id,
297-
path,
298305
endpoint_id,
299-
payload,
306+
path,
300307
value,
301-
timestamp
308+
payload,
309+
timestamp,
310+
state
302311
)
303312
end
304313
end
@@ -315,16 +324,18 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Core.DataTriggerTest do
315324
} = context
316325

317326
assert :ok ==
318-
Core.DataTrigger.execute_incoming_data_triggers(
319-
state,
320-
device.encoded_id,
327+
TriggersHandler.incoming_data(
328+
state.realm,
329+
device.device_id,
330+
state.groups,
321331
"iface",
322332
interface_id,
323-
"/path",
324333
endpoint_id,
325-
payload,
334+
"/path",
326335
value,
327-
timestamp
336+
payload,
337+
timestamp,
338+
state
328339
)
329340
end
330341
end

0 commit comments

Comments
 (0)