-
-
Notifications
You must be signed in to change notification settings - Fork 23
Expand file tree
/
Copy pathreactor.ex
More file actions
309 lines (253 loc) · 8.65 KB
/
reactor.ex
File metadata and controls
309 lines (253 loc) · 8.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
defmodule Reactor do
alias Reactor.{Dsl, Error.Validation.StateError, Executor, Step}
@moduledoc """
Reactor is a dynamic, concurrent, dependency resolving saga orchestrator.
## Usage
You can construct a reactor using the `Reactor` Spark DSL:
```elixir
defmodule HelloWorldReactor do
@moduledoc false
use Reactor
input :whom
step :greet, Greeter do
argument :whom, input(:whom)
end
return :greet
end
```
iex> Reactor.run(HelloWorldReactor, %{whom: "Dear Reader"})
{:ok, "Hello, Dear Reader!"}
or you can build it programmatically:
iex> reactor = Builder.new()
...> {:ok, reactor} = Builder.add_input(reactor, :whom)
...> {:ok, reactor} = Builder.add_step(reactor, :greet, Greeter, whom: {:input, :whom})
...> {:ok, reactor} = Builder.return(reactor, :greet)
...> Reactor.run(reactor, %{whom: nil})
{:ok, "Hello, World!"}
"""
defstruct context: %{},
id: nil,
input_descriptions: %{},
inputs: [],
intermediate_results: %{},
middleware: [],
plan: nil,
return: nil,
state: :pending,
steps: [],
undo: []
use Spark.Dsl, default_extensions: [extensions: [Dsl]]
@type context :: %{optional(atom) => any}
@type context_arg :: Enumerable.t({atom, any})
@typedoc """
Specify the maximum number of asynchronous steps which can be run in parallel.
Defaults to the result of `System.schedulers_online/0`. Only used if
`async?` is set to `true`.
"""
@type max_concurrency_option :: {:max_concurrency, pos_integer()}
@typedoc """
Specify the amount of execution time after which to halt processing.
Note that this is not a hard limit. The Reactor will stop when the first step
completes _after_ the timeout has expired.
Defaults to `:infinity`.
"""
@type timeout_option :: {:timeout, pos_integer() | :infinity}
@typedoc """
The maximum number of iterations which after which the Reactor will halt.
Defaults to `:infinity`.
"""
@type max_iterations_option :: {:max_iterations, pos_integer() | :infinity}
@typedoc """
How long to wait for asynchronous steps to complete when halting.
Defaults to 5000ms.
"""
@type halt_timeout_option :: {:halt_timeout, pos_integer() | :infinity}
@typedoc """
When set to `false` forces the Reactor to run every step synchronously,
regardless of the step configuration.
Defaults to `true`.
"""
@type async_option :: {:async?, boolean}
@typedoc """
Use a `Reactor.Executor.ConcurrencyTracker.pool_key` to allow this Reactor to
share it's concurrency pool with other Reactor instances.
If you do not specify one then the Reactor will initialise a new pool and
place it in it's context for any child Reactors to re-use.
Only used if `async?` is set to `true`.
"""
@type concurrency_key_option :: {:concurrency_key, reference()}
@typedoc """
When this option is set the Reactor will return a copy of the completed Reactor
struct for potential future undo.
"""
@type fully_reversible_option :: {:fully_reversible?, boolean}
@type run_options ::
Enumerable.t(
max_concurrency_option
| timeout_option
| max_iterations_option
| halt_timeout_option
| async_option
| concurrency_key_option
| fully_reversible_option
)
@type undo_options ::
Enumerable.t(
max_concurrency_option
| timeout_option
| max_iterations_option
| halt_timeout_option
| async_option
| concurrency_key_option
)
@type state :: :pending | :executing | :halted | :failed | :successful
@type inputs :: %{optional(atom) => any}
@type t :: %Reactor{
context: context,
id: any,
input_descriptions: %{atom => String.t()},
inputs: [atom],
intermediate_results: %{any => any},
middleware: [Reactor.Middleware.t()],
plan: nil | Graph.t(),
undo: [{Step.t(), any}],
return: any,
state: state,
steps: [Step.t()]
}
@doc "A guard which returns true if the value is a Reactor struct"
@spec is_reactor(any) :: Macro.t()
defguard is_reactor(reactor) when is_struct(reactor, __MODULE__)
@run_schema [
max_concurrency: [
type: :pos_integer,
required: false,
doc: "The maximum number of processes to use to run the Reactor"
],
timeout: [
type: {:or, [:pos_integer, {:literal, :infinity}]},
required: false,
default: :infinity,
doc: "How long to allow the Reactor to run for"
],
max_iterations: [
type: {:or, [:pos_integer, {:literal, :infinity}]},
required: false,
default: :infinity,
doc: "The maximum number of times to allow the Reactor to loop"
],
async?: [
type: :boolean,
required: false,
default: true,
doc: "Whether to allow the Reactor to start processes"
],
concurrency_key: [
type: :reference,
required: false,
hide: true
],
run_id: [
type: :any,
required: false,
doc: "A unique identifier for the Reactor run"
],
fully_reversible?: [
type: :boolean,
required: false,
default: false,
doc: "Return the completed reactor as well as the result for possible later reversal"
]
]
@doc """
Attempt to run a Reactor.
## Arguments
* `reactor` - The Reactor to run, either a Reactor DSL module, or a Reactor
struct.
* `inputs` - A map of values passed in to satisfy the Reactor's expected
inputs.
* `context` - An arbitrary map that will be merged into the Reactor context
and passed into each step.
## Options
#{Spark.Options.docs(@run_schema)}
"""
@spec run(t | module, inputs, context_arg, run_options) ::
{:ok, any} | {:ok, any, t} | {:error, any} | {:halted, t}
def run(reactor, inputs \\ %{}, context \\ %{}, options \\ [])
def run(reactor, inputs, context, options) when is_atom(reactor) do
if Spark.Dsl.is?(reactor, Reactor) do
run(reactor.reactor(), inputs, context, options)
else
{:error,
ArgumentError.exception(message: "Module `#{inspect(reactor)}` is not a Reactor module")}
end
end
def run(reactor, inputs, context, options)
when is_reactor(reactor) and reactor.state in ~w[pending halted]a do
Executor.run(reactor, inputs, context, options)
end
def run(reactor, _inputs, _context, _options) do
{:error,
StateError.exception(
reactor: reactor,
state: reactor.state,
expected: ~w[pending halted]a
)}
end
@doc "Raising version of `run/4`."
@spec run!(t | module, inputs, context_arg, run_options) :: any | no_return
def run!(reactor, inputs \\ %{}, context \\ %{}, options \\ [])
def run!(reactor, inputs, context, options) do
case run(reactor, inputs, context, options) do
{:ok, value} -> value
{:error, reason} -> raise reason
end
end
@undo_options Keyword.drop(@run_schema, [:fully_reversible?])
@doc """
Attempt to undo a previously successful Reactor.
## Arguments
* `reactor` - The previously successful Reactor struct.
* `context` - An arbitrary map that will be merged into the Reactor context and passed into each undo.
## Options
#{Spark.Options.docs(@undo_options)}
"""
@spec undo(t, context_arg, undo_options) :: :ok | {:error, any}
def undo(reactor, context, options \\ [])
def undo(reactor, _context, _options) when not is_struct(reactor, __MODULE__) do
{:error,
ArgumentError.exception(
message: "`reactor` value `#{inspect(reactor)}` is not a Reactor struct"
)}
end
def undo(reactor, _context, _options) when reactor.state != :successful do
{:error,
StateError.exception(
reactor: reactor,
state: reactor.state,
expected: ~w[successful]a
)}
end
def undo(_reactor, context, _options) when not is_map(context) do
{:error,
ArgumentError.exception(
message: "`context` value `#{inspect(context)}` is not valid context - must be a map"
)}
end
def undo(_reactor, _context, options) when not is_list(options) do
{:error,
ArgumentError.exception(
message: "`options` value `#{inspect(options)}` is not a keyword list"
)}
end
def undo(reactor, context, options) do
Reactor.Executor.undo(reactor, context, options)
end
@doc "A raising version of `undo/2`"
@spec undo!(t, context_arg, undo_options) :: :ok | no_return
def undo!(reactor, context, options) do
with {:error, reason} <- undo(reactor, context, options) do
raise reason
end
end
end