@@ -21,12 +21,13 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Queries do
2121 alias Astarte.Core.Device , as: CoreDevice
2222 alias Astarte.Core.InterfaceDescriptor
2323 alias Astarte.Core.Mapping
24+ alias Astarte.DataAccess.DateTime , as: MsDateTime
2425 alias Astarte.DataUpdaterPlant.Config
2526 alias Astarte.DataUpdaterPlant.DataUpdater.SimpleTrigger
2627 alias Astarte.DataUpdaterPlant.DataUpdater.DeletionInProgress
27- alias Astarte.DataUpdaterPlant.DataUpdater .Device
28- alias Astarte.DataUpdaterPlant.DataUpdater .Endpoint
29- alias Astarte.DataUpdaterPlant.DataUpdater .IndividualProperty
28+ alias Astarte.DataAccess.Devices .Device
29+ alias Astarte.DataAccess.Realms .Endpoint
30+ alias Astarte.DataAccess.Realms .IndividualProperty
3031 alias Astarte.DataUpdaterPlant.DataUpdater.KvStore
3132 alias Astarte.DataUpdaterPlant.DataUpdater.Realm
3233 alias Astarte.DataUpdaterPlant.Repo
@@ -131,8 +132,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Queries do
131132 % InterfaceDescriptor { interface_id: interface_id , storage: storage } = interface_descriptor
132133 % Mapping { endpoint_id: endpoint_id , value_type: value_type } = mapping
133134 keyspace_name = Realm . keyspace_name ( realm )
134- timestamp = div ( reception_timestamp , 10000 )
135- reception_timestamp_submillis = rem ( reception_timestamp , 10000 )
135+ { timestamp , reception_timestamp_submillis } = split_submillis ( reception_timestamp )
136136 column_name = CQLUtils . type_to_db_column_name ( value_type )
137137
138138 # TODO: :reception_timestamp_submillis is just a place holder right now
@@ -170,8 +170,8 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Queries do
170170 % InterfaceDescriptor { interface_id: interface_id , storage: storage } = interface_descriptor
171171 % Mapping { endpoint_id: endpoint_id , value_type: value_type } = mapping
172172 keyspace_name = Realm . keyspace_name ( realm )
173- timestamp = div ( reception_timestamp , 10000 )
174- reception_timestamp_submillis = rem ( reception_timestamp , 10000 )
173+ { timestamp , reception_timestamp_submillis } = split_submillis ( reception_timestamp )
174+ value_timestamp = Ecto.Type . cast! ( MsDateTime , value_timestamp )
175175 column_name = CQLUtils . type_to_db_column_name ( value_type )
176176
177177 # TODO: use received value_timestamp when needed
@@ -211,8 +211,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Queries do
211211 % InterfaceDescriptor { interface_id: interface_id , storage: storage } = interface_descriptor
212212
213213 keyspace_name = Realm . keyspace_name ( realm )
214- timestamp = div ( reception_timestamp , 10000 )
215- reception_timestamp_submillis = rem ( reception_timestamp , 10000 )
214+ { timestamp , reception_timestamp_submillis } = split_submillis ( reception_timestamp )
216215
217216 # TODO: we should cache endpoints by interface_id
218217 column_info =
@@ -252,6 +251,8 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Queries do
252251
253252 insert_value =
254253 if explicit_timestamp? do
254+ value_timestamp = Ecto.Type . cast! ( MsDateTime , value_timestamp )
255+
255256 Map . put ( insert_value , "value_timestamp" , value_timestamp )
256257 else
257258 insert_value
@@ -366,8 +367,8 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Queries do
366367 % InterfaceDescriptor { interface_id: interface_id } = interface_descriptor
367368 % Mapping { endpoint_id: endpoint_id } = mapping
368369 keyspace_name = Realm . keyspace_name ( realm )
369- timestamp = div ( reception_timestamp , 10000 ) |> DateTime . from_unix! ( :microsecond )
370- reception_timestamp_submillis = rem ( reception_timestamp , 10000 )
370+ { timestamp , reception_timestamp_submillis } = split_submillis ( reception_timestamp )
371+ value_timestamp = Ecto.Type . cast! ( MsDateTime , value_timestamp )
371372
372373 # TODO: :reception_timestamp_submillis is just a place holder right now
373374 entry = % {
@@ -377,7 +378,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Queries do
377378 path: path ,
378379 reception_timestamp: timestamp ,
379380 reception_timestamp_submillis: reception_timestamp_submillis ,
380- datetime_value: DateTime . from_unix! ( value_timestamp , :microsecond )
381+ datetime_value: value_timestamp
381382 }
382383
383384 opts =
@@ -457,7 +458,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Queries do
457458
458459 defp set_connection_info! ( realm , device_id , timestamp , ip_address ) do
459460 keyspace_name = Realm . keyspace_name ( realm )
460- timestamp = Ecto.Type . cast! ( :utc_datetime_usec , timestamp )
461+ timestamp = Ecto.Type . cast! ( MsDateTime , timestamp )
461462
462463 % Device { device_id: device_id }
463464 |> Ecto.Changeset . change (
@@ -516,7 +517,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Queries do
516517 interface_exchanged_bytes
517518 ) do
518519 keyspace_name = Realm . keyspace_name ( realm )
519- timestamp_ms = Ecto.Type . cast! ( :utc_datetime_usec , timestamp_ms )
520+ timestamp_ms = Ecto.Type . cast! ( MsDateTime , timestamp_ms )
520521
521522 % Device { device_id: device_id }
522523 |> Ecto.Changeset . change (
@@ -922,4 +923,13 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Queries do
922923 )
923924 |> Enum . to_list ( )
924925 end
926+
927+ defp split_submillis ( decimicro_ts ) do
928+ millisecond_ts = decimicro_ts |> div ( 10000 )
929+ submillis = decimicro_ts |> rem ( 10000 )
930+
931+ millisecond_datetime = Ecto.Type . cast! ( MsDateTime , millisecond_ts )
932+
933+ { millisecond_datetime , submillis }
934+ end
925935end
0 commit comments