Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,10 @@

defmodule Astarte.AppEngine.API.Auth do
alias Astarte.AppEngine.API.Queries
alias Astarte.AppEngine.API.Config
alias Astarte.Core.CQLUtils

require Logger

def fetch_public_key(realm) do
keyspace = CQLUtils.realm_name_to_keyspace_name(realm, Config.astarte_instance_id!())

Queries.fetch_public_key(keyspace)
Queries.fetch_public_key(realm)
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ defmodule Astarte.AppEngine.API.Device do
alias Astarte.DataAccess.Device, as: DeviceQueries
alias Astarte.DataAccess.Interface, as: InterfaceQueries
alias Ecto.Changeset
alias Astarte.Core.CQLUtils

require Logger

def list_devices!(realm_name, params) do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ defmodule Astarte.AppEngine.API.Device.Queries do
alias Astarte.Core.CQLUtils
alias Astarte.Core.Device
alias Astarte.Core.InterfaceDescriptor
alias Astarte.AppEngine.API.Realm
alias Astarte.AppEngine.API.Realm, as: DataAccessRealm
alias Astarte.AppEngine.API.Devices.Device, as: DatabaseDevice
alias Astarte.AppEngine.API.Endpoint, as: DatabaseEndpoint

require CQEx
require Logger

def retrieve_interfaces_list(realm_name, device_id) do
keyspace = keyspace_name(realm_name)
keyspace = DataAccessRealm.keyspace_name(realm_name)

query =
from d in DatabaseDevice,
Expand All @@ -53,7 +53,7 @@ defmodule Astarte.AppEngine.API.Device.Queries do
end

def retrieve_all_endpoint_ids_for_interface!(realm_name, interface_id, opts \\ []) do
keyspace = keyspace_name(realm_name)
keyspace = DataAccessRealm.keyspace_name(realm_name)

query =
from DatabaseEndpoint,
Expand All @@ -71,7 +71,7 @@ defmodule Astarte.AppEngine.API.Device.Queries do
end

def retrieve_all_endpoints_for_interface!(realm_name, interface_id) do
keyspace = keyspace_name(realm_name)
keyspace = DataAccessRealm.keyspace_name(realm_name)

query =
from DatabaseEndpoint,
Expand All @@ -83,7 +83,7 @@ defmodule Astarte.AppEngine.API.Device.Queries do
end

def retrieve_mapping(realm_name, interface_id, endpoint_id) do
keyspace = keyspace_name(realm_name)
keyspace = DataAccessRealm.keyspace_name(realm_name)

query =
from DatabaseEndpoint,
Expand All @@ -105,7 +105,7 @@ defmodule Astarte.AppEngine.API.Device.Queries do
end

def interface_has_explicit_timestamp?(realm_name, interface_id) do
keyspace = keyspace_name(realm_name)
keyspace = DataAccessRealm.keyspace_name(realm_name)
do_interface_has_explicit_timestamp?(keyspace, interface_id)
end

Expand All @@ -125,7 +125,7 @@ defmodule Astarte.AppEngine.API.Device.Queries do
end

def fetch_datastream_maximum_storage_retention(realm_name) do
keyspace = keyspace_name(realm_name)
keyspace = DataAccessRealm.keyspace_name(realm_name)
group = "realm_config"
key = "datastream_maximum_storage_retention"

Expand All @@ -145,16 +145,19 @@ defmodule Astarte.AppEngine.API.Device.Queries do
opts
) do
columns = default_endpoint_column_selection(endpoint_row)
keyspace = DataAccessRealm.keyspace_name(realm_name)

opts = %{opts | limit: 1}

do_get_datastream_values(realm_name, device_id, interface_row, endpoint_id, path, opts)
do_get_datastream_values(keyspace, device_id, interface_row, endpoint_id, path, opts)
|> select(^columns)
|> Repo.fetch_one()
end

def retrieve_all_endpoint_paths!(realm_name, device_id, interface_id, endpoint_id) do
find_endpoints(realm_name, "individual_properties", device_id, interface_id, endpoint_id)
keyspace = DataAccessRealm.keyspace_name(realm_name)

find_endpoints(keyspace, "individual_properties", device_id, interface_id, endpoint_id)
|> select([:path])
|> Repo.all()
end
Expand All @@ -176,7 +179,7 @@ defmodule Astarte.AppEngine.API.Device.Queries do
# TODO: use received value_timestamp when needed
# TODO: :reception_timestamp_submillis is just a place holder right now

keyspace = keyspace_name(realm_name)
keyspace = DataAccessRealm.keyspace_name(realm_name)

# Ecto expects microsecond precision
{reception, reception_submillis} = split_datetime_to_ms_and_submillis(reception_timestamp)
Expand Down Expand Up @@ -223,7 +226,7 @@ defmodule Astarte.AppEngine.API.Device.Queries do

# TODO: :reception_timestamp_submillis is just a place holder right now
%InterfaceDescriptor{interface_id: interface_id, storage: storage} = interface_descriptor
keyspace_name = Realm.keyspace_name(realm_name)
keyspace_name = DataAccessRealm.keyspace_name(realm_name)

q =
from v in storage,
Expand Down Expand Up @@ -257,7 +260,7 @@ defmodule Astarte.AppEngine.API.Device.Queries do
opts
) do
value_column = CQLUtils.type_to_db_column_name(endpoint.value_type)
keyspace = keyspace_name(realm_name)
keyspace = DataAccessRealm.keyspace_name(realm_name)

{timestamp_ms, timestamp_submillis} = split_ms_and_submillis(timestamp)

Expand Down Expand Up @@ -295,7 +298,7 @@ defmodule Astarte.AppEngine.API.Device.Queries do
opts
) do
value_column = CQLUtils.type_to_db_column_name(endpoint.value_type)
keyspace = keyspace_name(realm_name)
keyspace = DataAccessRealm.keyspace_name(realm_name)
{timestamp_ms, timestamp_submillis} = split_ms_and_submillis(timestamp)

attributes = %{
Expand Down Expand Up @@ -325,7 +328,7 @@ defmodule Astarte.AppEngine.API.Device.Queries do
timestamp,
opts
) do
keyspace = keyspace_name(realm_name)
keyspace = DataAccessRealm.keyspace_name(realm_name)
interface_id = interface_descriptor.interface_id

endpoint_rows =
Expand Down Expand Up @@ -452,12 +455,12 @@ defmodule Astarte.AppEngine.API.Device.Queries do
end

def retrieve_device_for_status(realm_name, device_id) do
keyspace = keyspace_name(realm_name)
keyspace = DataAccessRealm.keyspace_name(realm_name)
do_retrieve_device_for_status(keyspace, device_id)
end

def retrieve_device_status(realm_name, device_id) do
keyspace = keyspace_name(realm_name)
keyspace = DataAccessRealm.keyspace_name(realm_name)

with {:ok, device} <- do_retrieve_device_for_status(keyspace, device_id) do
{:ok, build_device_status(keyspace, device)}
Expand All @@ -479,7 +482,7 @@ defmodule Astarte.AppEngine.API.Device.Queries do
end

def retrieve_devices_list(realm_name, limit, retrieve_details?, previous_token) do
keyspace = keyspace_name(realm_name)
keyspace = DataAccessRealm.keyspace_name(realm_name)

field_selection =
if retrieve_details? do
Expand Down Expand Up @@ -526,7 +529,7 @@ defmodule Astarte.AppEngine.API.Device.Queries do
end

def device_alias_to_device_id(realm_name, device_alias) do
keyspace = keyspace_name(realm_name)
keyspace = DataAccessRealm.keyspace_name(realm_name)
do_device_alias_to_device_id(keyspace, device_alias)
end

Expand All @@ -541,7 +544,7 @@ defmodule Astarte.AppEngine.API.Device.Queries do
end

def insert_attribute(realm_name, device_id, attribute_key, attribute_value) do
keyspace = keyspace_name(realm_name)
keyspace = DataAccessRealm.keyspace_name(realm_name)
new_attribute = %{attribute_key => attribute_value}

query =
Expand All @@ -556,7 +559,7 @@ defmodule Astarte.AppEngine.API.Device.Queries do
end

def delete_attribute(realm_name, device_id, attribute_key) do
keyspace = keyspace_name(realm_name)
keyspace = DataAccessRealm.keyspace_name(realm_name)
query = from(d in DatabaseDevice, select: d.attributes)
opts = [prefix: keyspace, consistency: :quorum]

Expand All @@ -583,7 +586,7 @@ defmodule Astarte.AppEngine.API.Device.Queries do
end

def insert_alias(realm_name, device_id, alias_tag, alias_value) do
keyspace = keyspace_name(realm_name)
keyspace = DataAccessRealm.keyspace_name(realm_name)

name = %Name{
object_name: alias_value,
Expand Down Expand Up @@ -624,7 +627,7 @@ defmodule Astarte.AppEngine.API.Device.Queries do
end

def delete_alias(realm_name, device_id, alias_tag) do
keyspace = keyspace_name(realm_name)
keyspace = DataAccessRealm.keyspace_name(realm_name)

query =
from d in DatabaseDevice,
Expand Down Expand Up @@ -674,7 +677,7 @@ defmodule Astarte.AppEngine.API.Device.Queries do
end

def set_inhibit_credentials_request(realm_name, device_id, inhibit_credentials_request) do
keyspace = keyspace_name(realm_name)
keyspace = DataAccessRealm.keyspace_name(realm_name)

query =
from DatabaseDevice,
Expand All @@ -688,7 +691,7 @@ defmodule Astarte.AppEngine.API.Device.Queries do
end

def retrieve_object_datastream_values(realm_name, device_id, interface_row, path, columns, opts) do
keyspace = keyspace_name(realm_name)
keyspace = DataAccessRealm.keyspace_name(realm_name)

query_limit = query_limit(opts)
timestamp_column = timestamp_column(opts.explicit_timestamp)
Expand Down Expand Up @@ -719,9 +722,10 @@ defmodule Astarte.AppEngine.API.Device.Queries do
) do
value_column = CQLUtils.type_to_db_column_name(endpoint_row.value_type) |> String.to_atom()
columns = [:path, value_column]
keyspace = DataAccessRealm.keyspace_name(realm_name)

find_endpoints(
realm_name,
keyspace,
interface_row.storage,
device_id,
interface_row.interface_id,
Expand All @@ -741,9 +745,10 @@ defmodule Astarte.AppEngine.API.Device.Queries do
opts
) do
columns = default_endpoint_column_selection(endpoint_row)
keyspace = DataAccessRealm.keyspace_name(realm_name)

query =
do_get_datastream_values(realm_name, device_id, interface_row, endpoint_id, path, opts)
do_get_datastream_values(keyspace, device_id, interface_row, endpoint_id, path, opts)

values = query |> select(^columns) |> Repo.all()
count = query |> select([d], count(d.value_timestamp)) |> Repo.one!()
Expand All @@ -752,22 +757,20 @@ defmodule Astarte.AppEngine.API.Device.Queries do
end

def value_type_query(realm_name, interface_id, endpoint_id) do
keyspace = keyspace_name(realm_name)
keyspace = DataAccessRealm.keyspace_name(realm_name)
query = from DatabaseEndpoint, select: [:value_type]

Repo.get_by!(query, [interface_id: interface_id, endpoint_id: endpoint_id], prefix: keyspace)
end

defp do_get_datastream_values(
realm_name,
keyspace,
device_id,
interface_row,
endpoint_id,
path,
opts
) do
keyspace = keyspace_name(realm_name)

query_limit = query_limit(opts)

# Check the explicit user defined limit to know if we have to reorder data
Expand Down Expand Up @@ -822,9 +825,7 @@ defmodule Astarte.AppEngine.API.Device.Queries do
|> where(^filter_to)
end

defp find_endpoints(realm_name, table_name, device_id, interface_id, endpoint_id) do
keyspace = keyspace_name(realm_name)

defp find_endpoints(keyspace, table_name, device_id, interface_id, endpoint_id) do
from(table_name, prefix: ^keyspace)
|> where(device_id: ^device_id, interface_id: ^interface_id, endpoint_id: ^endpoint_id)
end
Expand All @@ -842,13 +843,6 @@ defmodule Astarte.AppEngine.API.Device.Queries do
[value_column | default_endpoint_column_selection()]
end

defp keyspace_name(realm_name) do
Astarte.Core.CQLUtils.realm_name_to_keyspace_name(
realm_name,
Astarte.DataAccess.Config.astarte_instance_id!()
)
end

defp timestamp_column(explicit_timestamp?) do
case explicit_timestamp? do
nil -> :reception_timestamp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ defmodule Astarte.AppEngine.API.Groups.Queries do
with {:ok, %{device_id: device_id}} <-
Ecto.Changeset.apply_action(device_changeset, :insert),
{:group_exists?, true} <-
{:group_exists?, group_exists?(realm_name, group_name)},
{:group_exists?, group_exists?(keyspace, group_name)},
:ok <- check_valid_device_for_group(keyspace, group_name, device_id),
:ok <- add_to_group(keyspace, group_name, [device_id]) do
:ok
Expand All @@ -94,9 +94,10 @@ defmodule Astarte.AppEngine.API.Groups.Queries do
end

def remove_device(realm_name, group_name, device_id) do
keyspace = DataAccessRealm.keyspace_name(realm_name)

with {:group_exists?, true} <-
{:group_exists?, group_exists?(realm_name, group_name)},
keyspace = DataAccessRealm.keyspace_name(realm_name),
{:group_exists?, group_exists?(keyspace, group_name)},
:ok <- remove_from_group(keyspace, group_name, device_id) do
:ok
else
Expand Down Expand Up @@ -241,19 +242,17 @@ defmodule Astarte.AppEngine.API.Groups.Queries do
end
end

defp compute_device_status(realm_name, device_row) do
defp compute_device_status(keyspace, device_row) do
%{
device_id: device_id
} = device_row

device_status = DeviceStatus.from_db_row(device_row)
deletion_in_progress? = deletion_in_progress?(realm_name, device_id)
deletion_in_progress? = deletion_in_progress?(keyspace, device_id)
%{device_status | deletion_in_progress: deletion_in_progress?}
end

defp deletion_in_progress?(realm_name, device_id) do
keyspace = DataAccessRealm.keyspace_name(realm_name)

defp deletion_in_progress?(keyspace, device_id) do
case Repo.fetch(DeletionInProgress, device_id, prefix: keyspace) do
{:ok, _} ->
true
Expand All @@ -278,9 +277,7 @@ defmodule Astarte.AppEngine.API.Groups.Queries do
end
end

defp group_exists?(realm_name, group_name) do
keyspace = DataAccessRealm.keyspace_name(realm_name)

defp group_exists?(keyspace, group_name) do
query =
from d in GroupedDevice,
prefix: ^keyspace,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,20 @@
#

defmodule Astarte.AppEngine.API.Health do
alias Astarte.AppEngine.API.Realm
alias Astarte.AppEngine.API.Queries

require Logger

def get_health do
astarte_keyspace = Realm.keyspace_name("astarte")

case Queries.check_astarte_health(astarte_keyspace, :quorum) do
case Queries.check_astarte_health(:quorum) do
:ok ->
:ok

{:error, :database_connection_error} ->
{:error, :bad_health}

{:error, :health_check_bad} ->
case Queries.check_astarte_health(astarte_keyspace, :one) do
case Queries.check_astarte_health(:one) do
:ok -> {:error, :degraded_health}
_error -> {:error, :bad_health}
end
Expand Down
Loading
Loading