-
-
Notifications
You must be signed in to change notification settings - Fork 11
Expand file tree
/
Copy pathaction_wrapper_helpers.ex
More file actions
119 lines (100 loc) · 3.54 KB
/
action_wrapper_helpers.ex
File metadata and controls
119 lines (100 loc) · 3.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
defmodule AshEvents.Events.ActionWrapperHelpers do
@moduledoc """
Helper functions used by the action wrappers.
"""
def dump_value(nil, _attribute), do: nil
def dump_value(values, %{type: {:array, attr_type}} = attribute) do
item_constraints = attribute.constraints[:items]
# This is a work around for a bug in Ash.Type.dump_to_embedded/3
Enum.map(values, fn value ->
{:ok, dumped_value} = Ash.Type.dump_to_embedded(attr_type, value, item_constraints)
dumped_value
end)
end
def dump_value(value, attribute) do
{:ok, dumped_value} = Ash.Type.dump_to_embedded(attribute.type, value, attribute.constraints)
dumped_value
end
defp cast_and_dump_value(value, attr_or_arg) do
case Ash.Type.cast_input(attr_or_arg.type, value, attr_or_arg.constraints) do
{:ok, cast_value} ->
dump_value(cast_value, attr_or_arg)
{:error, _} ->
dump_value(value, attr_or_arg)
end
end
def create_event!(changeset, original_params, module_opts, opts) do
pg_repo = AshPostgres.DataLayer.Info.repo(changeset.resource)
if pg_repo do
lock_key =
module_opts[:advisory_lock_key_generator].generate_key!(
changeset,
module_opts[:advisory_lock_key_default]
)
if is_list(lock_key) do
[key1, key2] = lock_key
Ecto.Adapters.SQL.query(pg_repo, "SELECT pg_advisory_xact_lock($1, $2)", [key1, key2])
else
Ecto.Adapters.SQL.query(pg_repo, "SELECT pg_advisory_xact_lock($1)", [lock_key])
end
end
params =
original_params
|> Enum.reduce(%{}, fn {key, value}, acc ->
cond do
attr = Ash.Resource.Info.attribute(changeset.resource, key) ->
Map.put(acc, key, cast_and_dump_value(value, attr))
arg = Enum.find(changeset.action.arguments, &(&1.name == key)) ->
Map.put(acc, key, cast_and_dump_value(value, arg))
true ->
acc
end
end)
event_log_resource = module_opts[:event_log]
[primary_key] = Ash.Resource.Info.primary_key(changeset.resource)
persist_actor_primary_keys = AshEvents.EventLog.Info.event_log(event_log_resource)
actor = opts[:actor]
record_id =
if changeset.action_type == :create do
Map.get(changeset.attributes, primary_key)
else
Map.get(changeset.data, primary_key)
end
metadata = Map.get(changeset.context, :ash_events_metadata, %{})
event_params =
%{
data: params,
record_id: record_id,
resource: changeset.resource,
action: module_opts[:action],
action_type: changeset.action_type,
metadata: metadata,
version: module_opts[:version]
}
event_params =
Enum.reduce(persist_actor_primary_keys, event_params, fn persist_actor_primary_key, input ->
if is_struct(actor) and actor.__struct__ == persist_actor_primary_key.destination do
primary_key = Map.get(actor, hd(Ash.Resource.Info.primary_key(actor.__struct__)))
Map.put(input, persist_actor_primary_key.name, primary_key)
else
input
end
end)
has_atomics? = not Enum.empty?(changeset.atomics)
event_log_resource
|> Ash.Changeset.for_create(:create, event_params, opts)
|> then(fn cs ->
if has_atomics? do
Ash.Changeset.add_error(
cs,
Ash.Error.Changes.InvalidChanges.exception(
message: "atomic changes are not compatible with ash_events"
)
)
else
cs
end
end)
|> Ash.create!()
end
end