Skip to content

Commit 7950d56

Browse files
authored
Merge pull request #63 from ash-project/add-upsert-action-support
Add support for upsert actions.
2 parents 1cf1dd3 + 92a6375 commit 7950d56

File tree

8 files changed

+347
-18
lines changed

8 files changed

+347
-18
lines changed

lib/event_log/replay.ex

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,16 @@ defmodule AshEvents.EventLog.Actions.Replay do
55
require Ash.Query
66
require Logger
77

8-
defp handle_action(%{action_type: :create} = event, resource, action, opts) do
8+
# Helper function to safely check if a record exists
9+
defp get_record_if_exists(resource, record_id, opts) do
10+
case Ash.get(resource, record_id, opts) do
11+
{:ok, record} -> {:ok, record}
12+
{:error, _} -> {:error, :not_found}
13+
end
14+
end
15+
16+
# Extract current create event replay logic into helper function
17+
defp replay_as_create(event, resource, action, opts) do
918
create_timestamp = AshEvents.Events.Info.events_create_timestamp!(event.resource)
1019
update_timestamp = AshEvents.Events.Info.events_update_timestamp!(event.resource)
1120

@@ -32,6 +41,53 @@ defmodule AshEvents.EventLog.Actions.Replay do
3241
:ok
3342
end
3443

44+
# Helper function to replay upsert events as updates when record already exists
45+
defp replay_upsert_as_update(event, resource, existing_record, opts) do
46+
# For upsert actions, we MUST have the auto-generated replay update action
47+
replay_action_name = :"ash_events_replay_#{event.action}_update"
48+
actions = Ash.Resource.Info.actions(resource)
49+
50+
update_action =
51+
case Enum.find(actions, &(&1.name == replay_action_name and &1.type == :update)) do
52+
nil ->
53+
raise "Expected auto-generated replay update action #{replay_action_name} for upsert action #{event.action} on #{resource}, but it was not found. This indicates a bug in the AshEvents transformer."
54+
55+
action ->
56+
action
57+
end
58+
59+
update_timestamp = AshEvents.Events.Info.events_update_timestamp!(event.resource)
60+
61+
input =
62+
if update_timestamp do
63+
Map.put(event.data, update_timestamp, event.occurred_at)
64+
else
65+
event.data
66+
end
67+
68+
existing_record
69+
|> Ash.Changeset.for_update(update_action.name, input, opts)
70+
|> Ash.update!()
71+
72+
:ok
73+
end
74+
75+
defp handle_action(%{action_type: :create} = event, resource, action, opts) do
76+
action_struct = Ash.Resource.Info.action(resource, action)
77+
78+
if action_struct.upsert? do
79+
case get_record_if_exists(resource, event.record_id, opts) do
80+
{:ok, existing_record} ->
81+
replay_upsert_as_update(event, resource, existing_record, opts)
82+
83+
{:error, :not_found} ->
84+
replay_as_create(event, resource, action, opts)
85+
end
86+
else
87+
replay_as_create(event, resource, action_struct, opts)
88+
end
89+
end
90+
3591
defp handle_action(%{action_type: :update} = event, resource, action, opts) do
3692
case Ash.get(resource, event.record_id, opts) do
3793
{:ok, record} ->

lib/events/create_action_wrapper.ex

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule AshEvents.CreateActionWrapper do
44
"""
55
use Ash.Resource.ManualCreate
66

7-
def create(changeset, module_opts, ctx) do
7+
def create(changeset, module_opts, %{upsert?: upsert?, upsert_keys: upsert_keys} = ctx) do
88
merged_ctx = Map.get(ctx, :source_context) |> Map.merge(ctx)
99

1010
if Map.get(merged_ctx, :ash_events_replay?) do
@@ -16,16 +16,45 @@ defmodule AshEvents.CreateActionWrapper do
1616
|> Ash.Context.to_opts()
1717
|> Keyword.put(:return_notifications?, ctx.return_notifications? || false)
1818

19-
AshEvents.Events.ActionWrapperHelpers.create_event!(
20-
changeset,
21-
merged_ctx.original_params,
22-
module_opts,
23-
opts
24-
)
25-
2619
data_layer = Ash.Resource.Info.data_layer(changeset.resource)
2720

28-
data_layer.create(changeset.resource, changeset)
21+
result =
22+
if upsert? do
23+
upsert_identity =
24+
if changeset.action.upsert_identity do
25+
Ash.Resource.Info.identity(changeset.resource, changeset.action.upsert_identity)
26+
else
27+
nil
28+
end
29+
30+
data_layer.upsert(changeset.resource, changeset, upsert_keys, upsert_identity)
31+
else
32+
data_layer.create(changeset.resource, changeset)
33+
end
34+
35+
case result do
36+
{:ok, record} ->
37+
# Create event with the actual record ID from the result
38+
[primary_key] = Ash.Resource.Info.primary_key(changeset.resource)
39+
actual_id = Map.get(record, primary_key)
40+
41+
event_changeset = %{
42+
changeset
43+
| attributes: Map.put(changeset.attributes, primary_key, actual_id)
44+
}
45+
46+
AshEvents.Events.ActionWrapperHelpers.create_event!(
47+
event_changeset,
48+
merged_ctx.original_params,
49+
module_opts,
50+
opts
51+
)
52+
53+
result
54+
55+
error ->
56+
error
57+
end
2958
end
3059
end
3160
end

lib/events/transformers/add_actions.ex

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -134,19 +134,50 @@ defmodule AshEvents.Events.Transformers.AddActions do
134134
}
135135
|> then(fn action ->
136136
case action.type do
137+
:create -> Map.put(action, :upsert?, action.upsert?)
137138
:update -> Map.put(action, :require_atomic?, false)
138139
:destroy -> Map.merge(action, %{require_atomic?: false, return_destroyed?: true})
139140
_ -> action
140141
end
141142
end)
142143

143-
{:ok,
144-
Spark.Dsl.Transformer.replace_entity(
145-
dsl,
146-
[:actions],
147-
manual_action,
148-
&(&1.name == action.name)
149-
)}
144+
{:ok, dsl_with_main_action} =
145+
{:ok,
146+
Spark.Dsl.Transformer.replace_entity(
147+
dsl,
148+
[:actions],
149+
manual_action,
150+
&(&1.name == action.name)
151+
)}
152+
153+
if action.type == :create and action.upsert? do
154+
replay_update_action_name = :"ash_events_replay_#{action.name}_update"
155+
156+
replay_update_action = %Ash.Resource.Actions.Update{
157+
name: replay_update_action_name,
158+
type: :update,
159+
accept: action.accept,
160+
arguments: action.arguments,
161+
primary?: false,
162+
description: "Auto-generated update action for replaying #{action.name} upsert events",
163+
require_atomic?: false,
164+
manual: nil,
165+
changes: [],
166+
touches_resources: [],
167+
transaction?: nil,
168+
metadata: [],
169+
delay_global_validations?: false
170+
}
171+
172+
{:ok,
173+
Spark.Dsl.Transformer.add_entity(
174+
dsl_with_main_action,
175+
[:actions],
176+
replay_update_action
177+
)}
178+
else
179+
{:ok, dsl_with_main_action}
180+
end
150181
end)
151182
end
152183
end
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
{
2+
"attributes": [
3+
{
4+
"allow_nil?": false,
5+
"default": "fragment(\"gen_random_uuid()\")",
6+
"generated?": false,
7+
"precision": null,
8+
"primary_key?": true,
9+
"references": null,
10+
"scale": null,
11+
"size": null,
12+
"source": "id",
13+
"type": "uuid"
14+
},
15+
{
16+
"allow_nil?": false,
17+
"default": "fragment(\"(now() AT TIME ZONE 'utc')\")",
18+
"generated?": false,
19+
"precision": null,
20+
"primary_key?": false,
21+
"references": null,
22+
"scale": null,
23+
"size": null,
24+
"source": "created_at",
25+
"type": "utc_datetime_usec"
26+
},
27+
{
28+
"allow_nil?": false,
29+
"default": "fragment(\"(now() AT TIME ZONE 'utc')\")",
30+
"generated?": false,
31+
"precision": null,
32+
"primary_key?": false,
33+
"references": null,
34+
"scale": null,
35+
"size": null,
36+
"source": "updated_at",
37+
"type": "utc_datetime_usec"
38+
},
39+
{
40+
"allow_nil?": false,
41+
"default": "nil",
42+
"generated?": false,
43+
"precision": null,
44+
"primary_key?": false,
45+
"references": null,
46+
"scale": null,
47+
"size": null,
48+
"source": "email",
49+
"type": "text"
50+
},
51+
{
52+
"allow_nil?": false,
53+
"default": "nil",
54+
"generated?": false,
55+
"precision": null,
56+
"primary_key?": false,
57+
"references": null,
58+
"scale": null,
59+
"size": null,
60+
"source": "given_name",
61+
"type": "text"
62+
},
63+
{
64+
"allow_nil?": false,
65+
"default": "nil",
66+
"generated?": false,
67+
"precision": null,
68+
"primary_key?": false,
69+
"references": null,
70+
"scale": null,
71+
"size": null,
72+
"source": "family_name",
73+
"type": "text"
74+
}
75+
],
76+
"base_filter": null,
77+
"check_constraints": [],
78+
"custom_indexes": [],
79+
"custom_statements": [],
80+
"has_create_action": true,
81+
"hash": "06D901E44EB8357C2BC9A70B6399393FBC6DFA13B847562B7C2063834FE18FBA",
82+
"identities": [
83+
{
84+
"all_tenants?": false,
85+
"base_filter": null,
86+
"index_name": "users_unique_email_index",
87+
"keys": [
88+
{
89+
"type": "atom",
90+
"value": "email"
91+
}
92+
],
93+
"name": "unique_email",
94+
"nils_distinct?": true,
95+
"where": null
96+
}
97+
],
98+
"multitenancy": {
99+
"attribute": null,
100+
"global": null,
101+
"strategy": null
102+
},
103+
"repo": "Elixir.AshEvents.TestRepo",
104+
"schema": null,
105+
"table": "users"
106+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
defmodule AshEvents.TestRepo.Migrations.AddEmailIdentity do
2+
@moduledoc """
3+
Updates resources based on their most recent snapshots.
4+
5+
This file was autogenerated with `mix ash_postgres.generate_migrations`
6+
"""
7+
8+
use Ecto.Migration
9+
10+
def up do
11+
create unique_index(:users, [:email], name: "users_unique_email_index")
12+
end
13+
14+
def down do
15+
drop_if_exists(unique_index(:users, [:email], name: "users_unique_email_index"))
16+
end
17+
end

0 commit comments

Comments
 (0)