@@ -26,20 +26,49 @@ defmodule Astarte.AppEngine.API.Groups do
2626 alias Astarte.AppEngine.API.Groups.Queries
2727 alias Astarte.Core.Device
2828
29+ alias Astarte.AppEngine.API.Realm
30+ alias Astarte.AppEngine.API.Devices.Device , as: DatabaseDevice
31+ alias Astarte.AppEngine.API.Groups.GroupedDevice
32+ alias Astarte.AppEngine.API.Repo
33+ alias Ecto.Changeset
34+
35+ import Ecto.Query
36+
2937 @ default_list_limit 1000
3038
3139 def create_group ( realm_name , params ) do
32- changeset = Group . changeset ( % Group { } , params )
33-
34- Queries . create_group ( realm_name , changeset )
40+ keyspace = Realm . keyspace_name ( realm_name )
41+
42+ group_changeset =
43+ % Group { }
44+ |> Group . changeset ( params )
45+
46+ with { :ok , group } <- Changeset . apply_action ( group_changeset , :insert ) ,
47+ { :ok , decoded_device_ids } <- decode_device_ids ( group . devices ) ,
48+ :ok <- check_all_devices_exist ( keyspace , decoded_device_ids , group_changeset ) ,
49+ :ok <- check_group_does_not_exist ( keyspace , group . group_name ) ,
50+ :ok <- add_to_group ( keyspace , group . group_name , decoded_device_ids ) do
51+ { :ok , group }
52+ end
3553 end
3654
3755 def list_groups ( realm_name ) do
38- Queries . list_groups ( realm_name )
56+ keyspace = Realm . keyspace_name ( realm_name )
57+
58+ from ( g in GroupedDevice , prefix: ^ keyspace , select: g . group_name , distinct: true )
59+ |> Repo . all ( )
3960 end
4061
4162 def get_group ( realm_name , group_name ) do
42- Queries . get_group ( realm_name , group_name )
63+ keyspace = Realm . keyspace_name ( realm_name )
64+
65+ group_query = from g in GroupedDevice , select: g . group_name , limit: 1
66+ fetch_clause = [ group_name: group_name ]
67+ opts = [ prefix: keyspace , error: :group_not_found ]
68+
69+ with { :ok , group_name } <- Repo . fetch_by ( group_query , fetch_clause , opts ) do
70+ { :ok , % Group { group_name: group_name } }
71+ end
4372 end
4473
4574 def list_detailed_devices ( realm_name , group_name , params \\ % { } ) do
@@ -111,4 +140,92 @@ defmodule Astarte.AppEngine.API.Groups do
111140 def check_device_in_group ( realm_name , group_name , device_id ) do
112141 Queries . check_device_in_group ( realm_name , group_name , device_id )
113142 end
143+
144+ defp check_group_exists ( keyspace , group_name ) do
145+ from ( GroupedDevice , select: [ :group_name ] , limit: 1 )
146+ |> Repo . fetch_by ( [ group_name: group_name ] , prefix: keyspace )
147+ end
148+
149+ defp check_group_does_not_exist ( keyspace , group_name ) do
150+ check_group_exists ( keyspace , group_name )
151+ |> case do
152+ { :error , _ } ->
153+ :ok
154+
155+ { :ok , _ } ->
156+ { :error , :group_already_exists }
157+ end
158+ end
159+
160+ defp check_all_devices_exist ( keyspace , device_ids , group_changeset ) do
161+ device_ids
162+ |> Enum . chunk_every ( 100 )
163+ |> Enum . reduce_while ( :ok , fn id_chunk , :ok ->
164+ existing_ids =
165+ from ( d in DatabaseDevice ,
166+ prefix: ^ keyspace ,
167+ where: d . device_id in ^ id_chunk ,
168+ select: d . device_id
169+ )
170+ |> Repo . all ( )
171+
172+ if Enum . count ( existing_ids ) == Enum . count ( id_chunk ) do
173+ { :cont , :ok }
174+ else
175+ # Some device_id was not present in the database. Take the first.
176+ not_found =
177+ id_chunk
178+ |> Enum . find ( & ( & 1 not in existing_ids ) )
179+ |> Device . encode_device_id ( )
180+
181+ group_changeset =
182+ group_changeset |> Changeset . add_error ( :devices , "must exist (#{ not_found } not found)" )
183+
184+ { :halt , { :error , group_changeset } }
185+ end
186+ end )
187+ end
188+
189+ defp decode_device_ids ( encoded_device_ids ) do
190+ { decoded_ids , errors } =
191+ encoded_device_ids
192+ |> Enum . map ( & Device . decode_device_id / 1 )
193+ |> Enum . split_with ( fn { result , _ } -> result == :ok end )
194+
195+ case errors do
196+ [ ] -> { :ok , Enum . map ( decoded_ids , fn { :ok , id } -> id end ) }
197+ [ first_error | _ ] -> first_error
198+ end
199+ end
200+
201+ defp add_to_group ( keyspace , group_name , decoded_device_ids ) do
202+ grouped_device_table = GroupedDevice . __schema__ ( :source )
203+
204+ insert_grouped_device_sql = """
205+ INSERT INTO #{ keyspace } .#{ grouped_device_table } (group_name, insertion_uuid, device_id)
206+ values (?, ?, ?)
207+ """
208+
209+ queries =
210+ decoded_device_ids
211+ |> Enum . flat_map ( fn device_id ->
212+ insertion_uuid = UUID . uuid1 ( )
213+
214+ group = % { group_name => insertion_uuid }
215+
216+ query =
217+ from ( DatabaseDevice , prefix: ^ keyspace , where: [ device_id: ^ device_id ] )
218+ |> update ( [ d ] , set: [ groups: fragment ( "groups + ?" , ^ group ) ] )
219+
220+ update_device_groups =
221+ Repo . to_sql ( :update_all , query )
222+
223+ insert_grouped_device_params = [ group_name , insertion_uuid , device_id ]
224+ insert_grouped_device = { insert_grouped_device_sql , insert_grouped_device_params }
225+
226+ [ update_device_groups , insert_grouped_device ]
227+ end )
228+
229+ Exandra . execute_batch ( Repo , % Exandra.Batch { queries: queries } )
230+ end
114231end
0 commit comments