Skip to content

Commit 5c8d12c

Browse files
committed
Auto-resume long-paused jobs overnight
1 parent f33af9c commit 5c8d12c

3 files changed

Lines changed: 154 additions & 8 deletions

File tree

config/config.exs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,11 @@ config :reencodarr, Reencodarr.Repo,
4141

4242
config :reencodarr, :temp_dir, Path.join(System.tmp_dir!(), "ab-av1")
4343

44+
config :reencodarr, Reencodarr.AbAv1.ProcessControl,
45+
auto_resume_hour: 2,
46+
auto_resume_after_ms: 4 * 60 * 60 * 1000,
47+
auto_resume_timezone: "America/Denver"
48+
4449
# Configure file exclude patterns for video filtering
4550
config :reencodarr, :exclude_patterns, [
4651
# Sample patterns - can be configured per environment

lib/reencodarr/ab_av1/process_control.ex

Lines changed: 112 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,15 @@ defmodule Reencodarr.AbAv1.ProcessControl do
99

1010
use GenServer
1111

12+
alias Reencodarr.AbAv1.{CrfSearch, Encode}
13+
14+
require Logger
15+
1216
@services [:crf_searcher, :encoder]
13-
@initial_state %{crf_searcher: false, encoder: false}
17+
@check_interval_ms :timer.minutes(1)
18+
@default_auto_resume_after_ms :timer.hours(4)
19+
@default_auto_resume_hour 2
20+
@default_auto_resume_timezone "America/Denver"
1421

1522
@spec start_link(keyword()) :: GenServer.on_start()
1623
def start_link(opts \\ []), do: GenServer.start_link(__MODULE__, opts, name: __MODULE__)
@@ -27,25 +34,123 @@ defmodule Reencodarr.AbAv1.ProcessControl do
2734

2835
@spec suspend(atom()) :: :ok
2936
def suspend(service) when service in @services do
30-
cast_or_default({:set, service, true})
37+
cast_or_default({:suspend, service})
3138
end
3239

3340
@spec resume(atom()) :: :ok
3441
def resume(service) when service in @services do
35-
cast_or_default({:set, service, false})
42+
cast_or_default({:resume, service})
3643
end
3744

3845
@impl true
39-
def init(_opts), do: {:ok, @initial_state}
46+
def init(opts) do
47+
state = initial_state(opts)
48+
schedule_auto_resume_check(state.check_interval_ms)
49+
{:ok, state}
50+
end
4051

4152
@impl true
4253
def handle_call({:suspended?, service}, _from, state) do
43-
{:reply, Map.get(state, service, false), state}
54+
{:reply, get_in(state, [:services, service, :suspended?]) || false, state}
55+
end
56+
57+
@impl true
58+
def handle_cast({:suspend, service}, state) do
59+
{:noreply, put_service_state(state, service, true, DateTime.utc_now())}
60+
end
61+
62+
@impl true
63+
def handle_cast({:resume, service}, state) do
64+
{:noreply, put_service_state(state, service, false, nil)}
65+
end
66+
67+
if Mix.env() == :test do
68+
@impl true
69+
def handle_cast({:force_suspend_at, service, suspended_at}, state) do
70+
{:noreply, put_service_state(state, service, true, suspended_at)}
71+
end
4472
end
4573

4674
@impl true
47-
def handle_cast({:set, service, suspended?}, state) do
48-
{:noreply, Map.put(state, service, suspended?)}
75+
def handle_info(:auto_resume_check, state) do
76+
schedule_auto_resume_check(state.check_interval_ms)
77+
{:noreply, maybe_auto_resume(state)}
78+
end
79+
80+
defp initial_state(opts) do
81+
app_config = Application.get_env(:reencodarr, __MODULE__, [])
82+
config = Keyword.merge(app_config, opts)
83+
84+
%{
85+
services: %{
86+
crf_searcher: service_state(),
87+
encoder: service_state()
88+
},
89+
auto_resume_hour: Keyword.get(config, :auto_resume_hour, @default_auto_resume_hour),
90+
auto_resume_after_ms:
91+
Keyword.get(config, :auto_resume_after_ms, @default_auto_resume_after_ms),
92+
auto_resume_timezone:
93+
Keyword.get(config, :auto_resume_timezone, @default_auto_resume_timezone),
94+
check_interval_ms: Keyword.get(config, :check_interval_ms, @check_interval_ms)
95+
}
96+
end
97+
98+
defp service_state, do: %{suspended?: false, suspended_at: nil}
99+
100+
defp put_service_state(state, service, suspended?, suspended_at) do
101+
put_in(state, [:services, service], %{suspended?: suspended?, suspended_at: suspended_at})
102+
end
103+
104+
defp schedule_auto_resume_check(interval) do
105+
Process.send_after(self(), :auto_resume_check, interval)
106+
end
107+
108+
defp maybe_auto_resume(state) do
109+
if auto_resume_window?(state) do
110+
Enum.reduce(@services, state, &maybe_auto_resume_service/2)
111+
else
112+
state
113+
end
114+
end
115+
116+
defp auto_resume_window?(state) do
117+
case DateTime.now(state.auto_resume_timezone) do
118+
{:ok, now} -> now.hour == state.auto_resume_hour
119+
{:error, _} -> DateTime.utc_now().hour == state.auto_resume_hour
120+
end
121+
end
122+
123+
defp maybe_auto_resume_service(service, state) do
124+
service_state = get_in(state, [:services, service])
125+
126+
if old_enough_to_resume?(service_state, state.auto_resume_after_ms) do
127+
Logger.info("Auto-resuming #{service} after extended pause")
128+
resume_service(service)
129+
put_service_state(state, service, false, nil)
130+
else
131+
state
132+
end
133+
end
134+
135+
defp old_enough_to_resume?(%{suspended?: true, suspended_at: %DateTime{} = suspended_at}, ms) do
136+
DateTime.diff(DateTime.utc_now(), suspended_at, :millisecond) >= ms
137+
end
138+
139+
defp old_enough_to_resume?(_service_state, _ms), do: false
140+
141+
defp resume_service(:crf_searcher), do: CrfSearch.resume_current()
142+
defp resume_service(:encoder), do: Encode.resume_current()
143+
144+
if Mix.env() == :test do
145+
@doc false
146+
def force_suspend_at(service, suspended_at) when service in @services do
147+
GenServer.cast(__MODULE__, {:force_suspend_at, service, suspended_at})
148+
end
149+
150+
@doc false
151+
def auto_resume_check do
152+
send(__MODULE__, :auto_resume_check)
153+
end
49154
end
50155

51156
defp call_or_default(message, default) do

test/reencodarr/ab_av1/process_control_test.exs

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,14 @@ defmodule Reencodarr.AbAv1.ProcessControlTest do
44
alias Reencodarr.AbAv1.ProcessControl
55

66
setup do
7-
start_supervised!(ProcessControl)
7+
start_supervised!(
8+
{ProcessControl,
9+
auto_resume_hour: DateTime.utc_now().hour,
10+
auto_resume_timezone: "Etc/UTC",
11+
auto_resume_after_ms: 4 * 60 * 60 * 1000,
12+
check_interval_ms: :timer.hours(1)}
13+
)
14+
815
:ok
916
end
1017

@@ -32,4 +39,33 @@ defmodule Reencodarr.AbAv1.ProcessControlTest do
3239
assert :ok = ProcessControl.resume(:encoder)
3340
refute ProcessControl.suspended?(:encoder)
3441
end
42+
43+
test "auto-resumes services paused for more than four hours during the configured hour" do
44+
five_hours_ago = DateTime.add(DateTime.utc_now(), -5 * 60 * 60, :second)
45+
46+
ProcessControl.force_suspend_at(:encoder, five_hours_ago)
47+
ProcessControl.force_suspend_at(:crf_searcher, five_hours_ago)
48+
Process.sleep(20)
49+
50+
assert ProcessControl.suspended?(:encoder)
51+
assert ProcessControl.suspended?(:crf_searcher)
52+
53+
ProcessControl.auto_resume_check()
54+
Process.sleep(20)
55+
56+
refute ProcessControl.suspended?(:encoder)
57+
refute ProcessControl.suspended?(:crf_searcher)
58+
end
59+
60+
test "auto-resume keeps recently paused services paused" do
61+
one_hour_ago = DateTime.add(DateTime.utc_now(), -60 * 60, :second)
62+
63+
ProcessControl.force_suspend_at(:encoder, one_hour_ago)
64+
Process.sleep(20)
65+
66+
ProcessControl.auto_resume_check()
67+
Process.sleep(20)
68+
69+
assert ProcessControl.suspended?(:encoder)
70+
end
3571
end

0 commit comments

Comments
 (0)