Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 57 additions & 1 deletion lib/event_log/replay.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,16 @@ defmodule AshEvents.EventLog.Actions.Replay do
require Ash.Query
require Logger

defp handle_action(%{action_type: :create} = event, resource, action, opts) do
# Helper function to safely check if a record exists
defp get_record_if_exists(resource, record_id, opts) do
case Ash.get(resource, record_id, opts) do
{:ok, record} -> {:ok, record}
{:error, _} -> {:error, :not_found}
end
end

# Extract current create event replay logic into helper function
defp replay_as_create(event, resource, action, opts) do
create_timestamp = AshEvents.Events.Info.events_create_timestamp!(event.resource)
update_timestamp = AshEvents.Events.Info.events_update_timestamp!(event.resource)

Expand All @@ -32,6 +41,53 @@ defmodule AshEvents.EventLog.Actions.Replay do
:ok
end

# Helper function to replay upsert events as updates when record already exists
defp replay_upsert_as_update(event, resource, existing_record, opts) do
# For upsert actions, we MUST have the auto-generated replay update action
replay_action_name = :"ash_events_replay_#{event.action}_update"
actions = Ash.Resource.Info.actions(resource)

update_action =
case Enum.find(actions, &(&1.name == replay_action_name and &1.type == :update)) do
nil ->
raise "Expected auto-generated replay update action #{replay_action_name} for upsert action #{event.action} on #{resource}, but it was not found. This indicates a bug in the AshEvents transformer."

action ->
action
end

update_timestamp = AshEvents.Events.Info.events_update_timestamp!(event.resource)

input =
if update_timestamp do
Map.put(event.data, update_timestamp, event.occurred_at)
else
event.data
end

existing_record
|> Ash.Changeset.for_update(update_action.name, input, opts)
|> Ash.update!()

:ok
end

defp handle_action(%{action_type: :create} = event, resource, action, opts) do
action_struct = Ash.Resource.Info.action(resource, action)

if action_struct.upsert? do
case get_record_if_exists(resource, event.record_id, opts) do
{:ok, existing_record} ->
replay_upsert_as_update(event, resource, existing_record, opts)

{:error, :not_found} ->
replay_as_create(event, resource, action, opts)
end
else
replay_as_create(event, resource, action_struct, opts)
end
end

defp handle_action(%{action_type: :update} = event, resource, action, opts) do
case Ash.get(resource, event.record_id, opts) do
{:ok, record} ->
Expand Down
47 changes: 38 additions & 9 deletions lib/events/create_action_wrapper.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule AshEvents.CreateActionWrapper do
"""
use Ash.Resource.ManualCreate

def create(changeset, module_opts, ctx) do
def create(changeset, module_opts, %{upsert?: upsert?, upsert_keys: upsert_keys} = ctx) do
merged_ctx = Map.get(ctx, :source_context) |> Map.merge(ctx)

if Map.get(merged_ctx, :ash_events_replay?) do
Expand All @@ -16,16 +16,45 @@ defmodule AshEvents.CreateActionWrapper do
|> Ash.Context.to_opts()
|> Keyword.put(:return_notifications?, ctx.return_notifications? || false)

AshEvents.Events.ActionWrapperHelpers.create_event!(
changeset,
merged_ctx.original_params,
module_opts,
opts
)

data_layer = Ash.Resource.Info.data_layer(changeset.resource)

data_layer.create(changeset.resource, changeset)
result =
if upsert? do
upsert_identity =
if changeset.action.upsert_identity do
Ash.Resource.Info.identity(changeset.resource, changeset.action.upsert_identity)
else
nil
end

data_layer.upsert(changeset.resource, changeset, upsert_keys, upsert_identity)
else
data_layer.create(changeset.resource, changeset)
end

case result do
{:ok, record} ->
# Create event with the actual record ID from the result
[primary_key] = Ash.Resource.Info.primary_key(changeset.resource)
actual_id = Map.get(record, primary_key)

event_changeset = %{
changeset
| attributes: Map.put(changeset.attributes, primary_key, actual_id)
}

AshEvents.Events.ActionWrapperHelpers.create_event!(
event_changeset,
merged_ctx.original_params,
module_opts,
opts
)

result

error ->
error
end
end
end
end
45 changes: 38 additions & 7 deletions lib/events/transformers/add_actions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -134,19 +134,50 @@ defmodule AshEvents.Events.Transformers.AddActions do
}
|> then(fn action ->
case action.type do
:create -> Map.put(action, :upsert?, action.upsert?)
:update -> Map.put(action, :require_atomic?, false)
:destroy -> Map.merge(action, %{require_atomic?: false, return_destroyed?: true})
_ -> action
end
end)

{:ok,
Spark.Dsl.Transformer.replace_entity(
dsl,
[:actions],
manual_action,
&(&1.name == action.name)
)}
{:ok, dsl_with_main_action} =
{:ok,
Spark.Dsl.Transformer.replace_entity(
dsl,
[:actions],
manual_action,
&(&1.name == action.name)
)}

if action.type == :create and action.upsert? do
replay_update_action_name = :"ash_events_replay_#{action.name}_update"

replay_update_action = %Ash.Resource.Actions.Update{
name: replay_update_action_name,
type: :update,
accept: action.accept,
arguments: action.arguments,
primary?: false,
description: "Auto-generated update action for replaying #{action.name} upsert events",
require_atomic?: false,
manual: nil,
changes: [],
touches_resources: [],
transaction?: nil,
metadata: [],
delay_global_validations?: false
}

{:ok,
Spark.Dsl.Transformer.add_entity(
dsl_with_main_action,
[:actions],
replay_update_action
)}
else
{:ok, dsl_with_main_action}
end
end)
end
end
106 changes: 106 additions & 0 deletions priv/resource_snapshots/test_repo/users/20250828221535.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
{
"attributes": [
{
"allow_nil?": false,
"default": "fragment(\"gen_random_uuid()\")",
"generated?": false,
"precision": null,
"primary_key?": true,
"references": null,
"scale": null,
"size": null,
"source": "id",
"type": "uuid"
},
{
"allow_nil?": false,
"default": "fragment(\"(now() AT TIME ZONE 'utc')\")",
"generated?": false,
"precision": null,
"primary_key?": false,
"references": null,
"scale": null,
"size": null,
"source": "created_at",
"type": "utc_datetime_usec"
},
{
"allow_nil?": false,
"default": "fragment(\"(now() AT TIME ZONE 'utc')\")",
"generated?": false,
"precision": null,
"primary_key?": false,
"references": null,
"scale": null,
"size": null,
"source": "updated_at",
"type": "utc_datetime_usec"
},
{
"allow_nil?": false,
"default": "nil",
"generated?": false,
"precision": null,
"primary_key?": false,
"references": null,
"scale": null,
"size": null,
"source": "email",
"type": "text"
},
{
"allow_nil?": false,
"default": "nil",
"generated?": false,
"precision": null,
"primary_key?": false,
"references": null,
"scale": null,
"size": null,
"source": "given_name",
"type": "text"
},
{
"allow_nil?": false,
"default": "nil",
"generated?": false,
"precision": null,
"primary_key?": false,
"references": null,
"scale": null,
"size": null,
"source": "family_name",
"type": "text"
}
],
"base_filter": null,
"check_constraints": [],
"custom_indexes": [],
"custom_statements": [],
"has_create_action": true,
"hash": "06D901E44EB8357C2BC9A70B6399393FBC6DFA13B847562B7C2063834FE18FBA",
"identities": [
{
"all_tenants?": false,
"base_filter": null,
"index_name": "users_unique_email_index",
"keys": [
{
"type": "atom",
"value": "email"
}
],
"name": "unique_email",
"nils_distinct?": true,
"where": null
}
],
"multitenancy": {
"attribute": null,
"global": null,
"strategy": null
},
"repo": "Elixir.AshEvents.TestRepo",
"schema": null,
"table": "users"
}
17 changes: 17 additions & 0 deletions priv/test_repo/migrations/20250828221535_add_email_identity.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
defmodule AshEvents.TestRepo.Migrations.AddEmailIdentity do
@moduledoc """
Updates resources based on their most recent snapshots.

This file was autogenerated with `mix ash_postgres.generate_migrations`
"""

use Ecto.Migration

def up do
create unique_index(:users, [:email], name: "users_unique_email_index")
end

def down do
drop_if_exists(unique_index(:users, [:email], name: "users_unique_email_index"))
end
end
Loading
Loading