Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 88 additions & 6 deletions lib/reactor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,24 @@ defmodule Reactor do
"""
@type concurrency_key_option :: {:concurrency_key, reference()}

@type options ::
@typedoc """
When this option is set the Reactor will return a copy of the completed Reactor
struct for potential future undo.
"""
@type fully_reversible_option :: {:fully_reversible?, boolean}

@type run_options ::
Enumerable.t(
max_concurrency_option
| timeout_option
| max_iterations_option
| halt_timeout_option
| async_option
| concurrency_key_option
| fully_reversible_option
)

@type undo_options ::
Enumerable.t(
max_concurrency_option
| timeout_option
Expand Down Expand Up @@ -135,7 +152,7 @@ defmodule Reactor do
@spec is_reactor(any) :: Macro.t()
defguard is_reactor(reactor) when is_struct(reactor, __MODULE__)

@option_schema [
@run_schema [
max_concurrency: [
type: :pos_integer,
required: false,
Expand Down Expand Up @@ -168,6 +185,12 @@ defmodule Reactor do
type: :any,
required: false,
doc: "A unique identifier for the Reactor run"
],
fully_reversible?: [
type: :boolean,
required: false,
default: false,
doc: "Return the completed reactor as well as the result for possible later reversal"
]
]

Expand All @@ -185,10 +208,10 @@ defmodule Reactor do

## Options

#{Spark.Options.docs(@option_schema)}
#{Spark.Options.docs(@run_schema)}
"""
@doc spark_opts: [{4, @option_schema}]
@spec run(t | module, inputs, context_arg, options) :: {:ok, any} | {:error, any} | {:halted, t}
@spec run(t | module, inputs, context_arg, run_options) ::
{:ok, any} | {:error, any} | {:halted, t}
def run(reactor, inputs \\ %{}, context \\ %{}, options \\ [])

def run(reactor, inputs, context, options) when is_atom(reactor) do
Expand All @@ -215,7 +238,7 @@ defmodule Reactor do
end

@doc "Raising version of `run/4`."
@spec run!(t | module, inputs, context_arg, options) :: any | no_return
@spec run!(t | module, inputs, context_arg, run_options) :: any | no_return
def run!(reactor, inputs \\ %{}, context \\ %{}, options \\ [])

def run!(reactor, inputs, context, options) do
Expand All @@ -224,4 +247,63 @@ defmodule Reactor do
{:error, reason} -> raise reason
end
end

@undo_options Keyword.drop(@run_schema, [:fully_reversible?])

@doc """
Attempt to undo a previously successful Reactor.

## Arguments

* `reactor` - The previously successful Reactor struct.
* `context` - An arbitrary map that will be merged into the Reactor context and passed into each undo.

## Options

#{Spark.Options.docs(@undo_options)}
"""
@spec undo(t, context_arg, undo_options) :: :ok | {:error, any}
def undo(reactor, context, options \\ [])

def undo(reactor, _context, _options) when not is_struct(reactor, __MODULE__) do
{:error,
ArgumentError.exception(
message: "`reactor` value `#{inspect(reactor)}` is not a Reactor struct"
)}
end

def undo(reactor, _context, _options) when reactor.state != :successful do
{:error,
StateError.exception(
reactor: reactor,
state: reactor.state,
expected: ~w[successful]a
)}
end

def undo(_reactor, context, _options) when not is_map(context) do
{:error,
ArgumentError.exception(
message: "`context` value `#{inspect(context)}` is not valid context - must be a map"
)}
end

def undo(_reactor, _context, options) when not is_list(options) do
{:error,
ArgumentError.exception(
message: "`options` value `#{inspect(options)}` is not a keyword list"
)}
end

def undo(reactor, context, options) do
Reactor.Executor.undo(reactor, context, options)
end

@doc "A raising version of `undo/2`"
@spec undo!(t, context_arg, undo_options) :: :ok | no_return
def undo!(reactor, context, options) do
with {:error, reason} <- undo(reactor, context, options) do
raise reason
end
end
end
33 changes: 28 additions & 5 deletions lib/reactor/executor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ defmodule Reactor.Executor do

You probably shouldn't call this directly, but use `Reactor.run/4` instead.
"""
@spec run(Reactor.t(), Reactor.inputs(), Reactor.context(), Reactor.options()) ::
{:ok, any} | {:halted, Reactor.t()} | {:error, any}
@spec run(Reactor.t(), Reactor.inputs(), Reactor.context(), Reactor.run_options()) ::
{:ok, any} | {:ok, any, Reactor.t()} | {:halted, Reactor.t()} | {:error, any}
def run(reactor, inputs \\ %{}, context \\ %{}, options \\ [])

def run(reactor, _inputs, _context, _options) when is_nil(reactor.return),
Expand All @@ -76,6 +76,21 @@ defmodule Reactor.Executor do
expected: ~w[pending halted]a
)}

@doc """
Undo a previously successful Reactor.
"""
@spec undo(Reactor.t(), Reactor.context(), Reactor.undo_options()) :: :ok | {:error, any}
def undo(reactor, context, options) do
inputs =
reactor.context
|> Map.get(:private, %{})
|> Map.get(:inputs, %{})

with {:ok, reactor, state} <- Executor.Init.init(reactor, inputs, context, options) do
handle_undo(reactor, state)
end
end

defp execute(reactor, state) when state.max_iterations == 0 do
{reactor, _status} = Executor.Async.collect_remaining_tasks_for_shutdown(reactor, state)
maybe_release_pool(state)
Expand Down Expand Up @@ -107,10 +122,16 @@ defmodule Reactor.Executor do
{:error, reason} -> {:error, reason}
end

{:ok, result} ->
{:ok, result, reactor} ->
maybe_release_pool(state)

Executor.Hooks.complete(reactor, result, reactor.context)
with {:ok, result} <- Executor.Hooks.complete(reactor, result, reactor.context) do
if state.fully_reversible? do
{:ok, result, reactor}
else
{:ok, result}
end
end

{:error, reason} ->
maybe_release_pool(state)
Expand Down Expand Up @@ -231,6 +252,8 @@ defmodule Reactor.Executor do
handle_undo(%{reactor | state: :failed, undo: []}, state, Enum.reverse(reactor.undo))
end

defp handle_undo(_reactor, state, []) when state.errors == [], do: :ok

defp handle_undo(reactor, state, []) do
error = Reactor.Error.to_class(state.errors)
Executor.Hooks.error(reactor, error, reactor.context)
Expand All @@ -246,7 +269,7 @@ defmodule Reactor.Executor do
defp all_done(reactor) do
with 0 <- Graph.num_vertices(reactor.plan),
{:ok, value} <- Map.fetch(reactor.intermediate_results, reactor.return) do
{:ok, value}
{:ok, value, %{reactor | state: :successful}}
else
:error ->
{:error, MissingReturnResultError.exception(reactor: reactor)}
Expand Down
9 changes: 7 additions & 2 deletions lib/reactor/executor/init.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@ defmodule Reactor.Executor.Init do
import Reactor.Utils

@doc false
@spec init(Reactor.t(), Reactor.inputs(), Reactor.context(), Reactor.options()) ::
@spec init(
Reactor.t(),
Reactor.inputs(),
Reactor.context(),
Reactor.run_options() | Reactor.undo_options()
) ::
{:ok, Reactor.t(), state :: map} | {:error, any}
def init(reactor, _inputs, _context, _options) when not is_reactor(reactor),
do: {:error, ArgumentError.exception(message: "`reactor` is not a Reactor.")}
Expand All @@ -24,7 +29,7 @@ defmodule Reactor.Executor.Init do
reactor.context
|> deep_merge(context)
|> deep_merge(%{private: %{inputs: inputs}})
|> Map.put(:run_id, state.run_id)
|> Map.put_new(:run_id, state.run_id)

{:ok, %{reactor | context: context}, state}
end
Expand Down
6 changes: 4 additions & 2 deletions lib/reactor/executor/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ defmodule Reactor.Executor.State do
async?: true,
halt_timeout: 5000,
max_iterations: :infinity,
timeout: :infinity
timeout: :infinity,
fully_reversible?: false
}

defstruct async?: @defaults.async?,
Expand All @@ -24,7 +25,8 @@ defmodule Reactor.Executor.State do
run_id: nil,
skipped: MapSet.new(),
started_at: nil,
timeout: @defaults.timeout
timeout: @defaults.timeout,
fully_reversible?: @defaults.fully_reversible?

alias Reactor.{Executor.ConcurrencyTracker, Step}

Expand Down
54 changes: 54 additions & 0 deletions test/reactor_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,59 @@ defmodule ReactorTest do

assert {:ok, "McFly Marty"} = Reactor.run(reactor, name: "Marty McFly")
end

test "it can return successful reactors" do
assert {:ok, "Marty McFly", reactor} =
Reactor.run(BasicReactor, %{name: "McFly Marty"}, %{}, fully_reversible?: true)

assert reactor.state == :successful
end
end

describe "undo/2" do
defmodule UndoableReactor do
use Reactor

input :agent

step :push_a do
argument :agent, input(:agent)
run &push(&1, :a)
undo &undo/2
end

step :push_b do
wait_for :push_a
argument :agent, input(:agent)
run &push(&1, :b)
undo &undo/2
end

return :push_b

def push(args, value) do
Agent.update(args.agent, fn list -> [value | list] end)
{:ok, value}
end

def undo(value, args) do
Agent.update(args.agent, fn list -> List.delete(list, value) end)

:ok
end
end

test "previously successful reactors can be undone" do
{:ok, pid} = Agent.start_link(fn -> [:z] end)

assert {:ok, :b, reactor} =
Reactor.run(UndoableReactor, %{agent: pid}, %{}, fully_reversible?: true)

assert [:b, :a, :z] = Agent.get(pid, &Function.identity/1)

assert :ok = Reactor.undo(reactor, %{})

assert [:z] = Agent.get(pid, &Function.identity/1)
end
end
end