-
-
Notifications
You must be signed in to change notification settings - Fork 23
Expand file tree
/
Copy pathstate.ex
More file actions
94 lines (81 loc) · 2.54 KB
/
state.ex
File metadata and controls
94 lines (81 loc) · 2.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
defmodule Reactor.Executor.State do
@moduledoc """
Contains the reactor execution state.
This is run-time only information.
"""
@defaults %{
async?: true,
halt_timeout: 5000,
max_iterations: :infinity,
timeout: :infinity,
fully_reversible?: false
}
defstruct async?: @defaults.async?,
concurrency_key: nil,
current_tasks: %{},
errors: [],
halt_timeout: @defaults.halt_timeout,
max_concurrency: nil,
max_iterations: @defaults.max_iterations,
pool_owner: false,
retries: %{},
run_id: nil,
skipped: MapSet.new(),
started_at: nil,
timeout: @defaults.timeout,
fully_reversible?: @defaults.fully_reversible?
alias Reactor.{Executor.ConcurrencyTracker, Step}
@type t :: %__MODULE__{
async?: boolean,
concurrency_key: ConcurrencyTracker.pool_key(),
current_tasks: %{Task.t() => Step.t()},
errors: [any],
halt_timeout: pos_integer() | :infinity,
max_concurrency: pos_integer(),
max_iterations: pos_integer() | :infinity,
pool_owner: boolean,
retries: %{reference() => pos_integer()},
run_id: any,
skipped: MapSet.t(),
started_at: DateTime.t(),
timeout: pos_integer() | :infinity
}
@doc false
@spec init(map) :: t
def init(attrs \\ %{}) do
@defaults
|> Map.merge(attrs)
|> do_init()
end
defp do_init(attrs) do
attrs
|> maybe_set_max_concurrency()
|> maybe_allocate_concurrency_pool()
|> maybe_set_run_id()
|> Map.put(:started_at, DateTime.utc_now())
|> then(&struct!(__MODULE__, &1))
end
defp maybe_set_max_concurrency(attrs)
when is_integer(attrs.max_concurrency) and attrs.max_concurrency > 0,
do: attrs
defp maybe_set_max_concurrency(attrs) when attrs.async? == false,
do: Map.put(attrs, :max_concurrency, 0)
defp maybe_set_max_concurrency(attrs),
do: Map.put(attrs, :max_concurrency, System.schedulers_online())
defp maybe_allocate_concurrency_pool(attrs) when is_reference(attrs.concurrency_key) do
attrs
|> Map.put(:pool_owner, false)
end
defp maybe_allocate_concurrency_pool(attrs) do
attrs
|> Map.put(:concurrency_key, ConcurrencyTracker.allocate_pool(attrs.max_concurrency))
|> Map.put(:pool_owner, true)
end
defp maybe_set_run_id(attrs) do
attrs
|> Map.update(:run_id, make_ref(), fn
nil -> make_ref()
ref -> ref
end)
end
end