Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .formatter.exs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ spark_locals_without_parens = [
step: 2,
step: 3,
strict_ordering?: 1,
support_undo?: 1,
switch: 1,
switch: 2,
template: 1,
Expand Down
1 change: 1 addition & 0 deletions documentation/dsls/DSL-Reactor.md
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,7 @@ end
| [`description`](#reactor-compose-description){: #reactor-compose-description } | `String.t` | | An optional description for the step. |
| [`allow_async?`](#reactor-compose-allow_async?){: #reactor-compose-allow_async? } | `boolean` | `true` | Whether the composed reactor is allowed to run its steps asynchronously. |
| [`async?`](#reactor-compose-async?){: #reactor-compose-async? } | `boolean` | `true` | Whether the composed steps should be run asynchronously. |
| [`support_undo?`](#reactor-compose-support_undo?){: #reactor-compose-support_undo? } | `boolean` | `true` | Whether the composed reactor should also be undone on failure. |


### reactor.compose.argument
Expand Down
2 changes: 1 addition & 1 deletion lib/reactor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ defmodule Reactor do
#{Spark.Options.docs(@run_schema)}
"""
@spec run(t | module, inputs, context_arg, run_options) ::
{:ok, any} | {:error, any} | {:halted, t}
{:ok, any} | {:ok, any, t} | {:error, any} | {:halted, t}
def run(reactor, inputs \\ %{}, context \\ %{}, options \\ [])

def run(reactor, inputs, context, options) when is_atom(reactor) do
Expand Down
13 changes: 12 additions & 1 deletion lib/reactor/builder/compose.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ defmodule Reactor.Builder.Compose do
required: false,
default: true,
doc: "Whether the nested Reactor is allowed to run async or not"
],
support_undo?: [
type: :boolean,
required: false,
default: true,
doc: """
Whether the composed reactor should also be undone on failure.
"""
]
)

Expand All @@ -46,7 +54,10 @@ defmodule Reactor.Builder.Compose do
Builder.add_step(
reactor,
name,
{Reactor.Step.Compose, reactor: inner_reactor, allow_async?: options[:allow_async?]},
{Reactor.Step.Compose,
reactor: inner_reactor,
allow_async?: options[:allow_async?],
support_undo?: options[:support_undo?]},
arguments,
async?: options[:async?],
guards: options[:guards] || [],
Expand Down
17 changes: 14 additions & 3 deletions lib/reactor/dsl/compose.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ defmodule Reactor.Dsl.Compose do
description: nil,
guards: [],
name: nil,
reactor: nil
reactor: nil,
support_undo?: true

alias Reactor.{Builder, Dsl}

Expand All @@ -23,7 +24,8 @@ defmodule Reactor.Dsl.Compose do
description: nil | String.t(),
guards: [Dsl.Where.t() | Dsl.Guard.t()],
name: any,
reactor: module | Reactor.t()
reactor: module | Reactor.t(),
support_undo?: boolean
}

@doc false
Expand Down Expand Up @@ -90,6 +92,14 @@ defmodule Reactor.Dsl.Compose do
doc: """
Whether the composed steps should be run asynchronously.
"""
],
support_undo?: [
type: :boolean,
required: false,
default: true,
doc: """
Whether the composed reactor should also be undone on failure.
"""
]
]
}
Expand All @@ -102,7 +112,8 @@ defmodule Reactor.Dsl.Compose do
allow_async?: step.allow_async?,
async?: step.async?,
description: step.description,
guards: step.guards
guards: step.guards,
support_undo?: step.support_undo?
)
end

Expand Down
65 changes: 64 additions & 1 deletion lib/reactor/step/compose.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,57 @@ defmodule Reactor.Step.Compose do
"""

use Reactor.Step
alias Reactor.{Builder, Step}
@behaviour Reactor.Mermaid

@doc false
@impl true
def run(arguments, %{current_step: %{name: {:compose, _inner}}} = context, options) do
perform_reactor_run(arguments, context, options)
end

def run(arguments, context, options) do
if options[:support_undo?] do
schedule_undoable_reactor_run(context)
else
perform_reactor_run(arguments, context, options)
end
end

@doc false
@impl true
def can?(%{name: {:compose, _}, impl: {__MODULE__, opts}}, :undo),
do: opts[:support_undo?] || false

def can?(_, _), do: false

@doc false
@impl true
def undo(%{reactor: reactor}, _args, context, options) do
Reactor.undo(reactor, context,
concurrency_key: context.concurrency_key,
async?: options[:allow_async?]
)
end

def schedule_undoable_reactor_run(context) do
{:ok, :pending_result,
[
%{context.current_step | name: {:compose, context.current_step.name}, ref: make_ref()},
%{
Builder.new_step!(
context.current_step.name,
{Step.AnonFn, run: {__MODULE__, :extract_result, []}},
[composed: {:result, {:compose, context.current_step.name}}],
async?: context[:async?] || true,
max_retries: 0
)
| ref: context.current_step.ref
}
]}
end

def perform_reactor_run(arguments, context, options) do
reactor = Keyword.fetch!(options, :reactor)
allow_async? = Keyword.get(options, :allow_async?, true)

Expand All @@ -23,12 +69,29 @@ defmodule Reactor.Step.Compose do

Reactor.run(reactor, arguments, context,
concurrency_key: context.concurrency_key,
async?: child_async?
async?: child_async?,
fully_reversible?: options[:support_undo?]
)
|> case do
{:ok, result} ->
{:ok, result}

{:ok, result, reactor} ->
{:ok, %{result: result, reactor: reactor}}

{:error, reason} ->
{:error, reason}

{:halted, reactor} ->
{:halt, reactor}
end
end

@doc false
@impl true
def to_mermaid(step, options),
do: __MODULE__.Mermaid.to_mermaid(step, options)

def extract_result(%{composed: %{result: result}}, _), do: {:ok, result}
def extract_result(_, _), do: {:ok, nil}
end
49 changes: 49 additions & 0 deletions test/reactor/dsl/compose_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -352,4 +352,53 @@ defmodule Reactor.Dsl.ComposeTest do
"Child reactor should run synchronously when parent is sync, regardless of allow_async? setting"
end
end

describe "compose with undo" do
defmodule UndoInnerReactor do
@moduledoc false
use Reactor

input :agent

step :undoable do
argument :agent, input(:agent)
run &do_run/1
undo &do_undo/2
end

defp do_run(args) do
Agent.update(args.agent, fn list -> [:run | list] end)
{:ok, :run_result}
end

defp do_undo(:run_result, args) do
Agent.update(args.agent, fn list -> [:undo | list] end)
end
end

defmodule UndoOuterReactor do
@moduledoc false
use Reactor

input :agent

compose :inner, UndoInnerReactor do
argument :agent, input(:agent)
support_undo?(true)
end

flunk :oops, "Deliberate failure" do
wait_for :inner
end

return :inner
end

test "composed reactors can be undone on failure" do
{:ok, agent} = Agent.start_link(fn -> [] end)

assert {:error, _} = Reactor.run(UndoOuterReactor, %{agent: agent})
assert [:undo, :run] = Agent.get(agent, &Function.identity/1)
end
end
end