Skip to content
Merged
6 changes: 2 additions & 4 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,8 @@ config :reencodarr, Reencodarr.Repo,
temp_store: "MEMORY",
# Enable full mutex mode for better concurrency
locking_mode: "NORMAL",
# Allow reads during page writes
read_uncommitted: true,
# Increase busy timeout for concurrent operations (2 minutes)
busy_timeout: 120_000,
# Keep busy timeout modest now that writes are serialized in-process
busy_timeout: 5_000,
# Large cache size (256MB) for better performance
cache_size: -256_000,
# Large memory mapping (512MB) for better I/O performance
Expand Down
4 changes: 2 additions & 2 deletions config/dev.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ config :reencodarr, Reencodarr.Repo,
database: "priv/reencodarr_dev.db",
stacktrace: true,
show_sensitive_data_on_connection_error: true,
# Increase pool size for better concurrency with Broadway pipelines
pool_size: 20,
# Keep the read pool modest; writes are serialized through DbWriter
pool_size: 5,
# Increase checkout timeout for slow queries (especially JSON fragment queries in SQLite)
timeout: 30_000,
# DBConnection queue configuration for better handling under load
Expand Down
4 changes: 2 additions & 2 deletions config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ if config_env() == :prod do

config :reencodarr, Reencodarr.Repo,
database: database_path,
# Production pool size - can be overridden by DATABASE_POOL_SIZE env var
pool_size: String.to_integer(System.get_env("DATABASE_POOL_SIZE") || "20")
# Keep the read pool modest; writes are serialized through DbWriter
pool_size: String.to_integer(System.get_env("DATABASE_POOL_SIZE") || "5")

# The secret key base is used to sign/encrypt cookies and other secrets.
# A default value is used in config/dev.exs and config/test.exs but you
Expand Down
73 changes: 60 additions & 13 deletions lib/reencodarr/ab_av1/crf_searcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ defmodule Reencodarr.AbAv1.CrfSearcher do
require Logger

@max_buffered_lines 1024
@completed_without_subscriber_ttl_ms 100

# ---------------------------------------------------------------------------
# Public API
Expand Down Expand Up @@ -129,7 +130,8 @@ defmodule Reencodarr.AbAv1.CrfSearcher do
os_pid: os_pid,
metadata: metadata,
output_lines: [],
subscriber: nil
subscriber: nil,
pending_exit_status: nil
}}

{:error, reason} ->
Expand All @@ -144,6 +146,11 @@ defmodule Reencodarr.AbAv1.CrfSearcher do
count = length(buffered)
Enum.each(buffered, fn msg -> send(pid, msg) end)
Logger.debug("CrfSearcher: subscribed #{inspect(pid)}, replayed #{count} lines")

if state.pending_exit_status do
Process.send_after(self(), :deliver_pending_exit_status, 0)
end

{:reply, {:ok, count}, %{state | subscriber: pid}}
end

Expand All @@ -158,6 +165,10 @@ defmodule Reencodarr.AbAv1.CrfSearcher do
end

@impl true
def handle_call(:kill, _from, %{port: :none} = state) do
{:stop, :normal, :ok, state}
end

def handle_call(:kill, _from, state) do
Logger.info("CrfSearcher: kill() called, killing OS process #{state.os_pid}")
Helper.kill_os_process(state.os_pid)
Expand All @@ -170,15 +181,8 @@ defmodule Reencodarr.AbAv1.CrfSearcher do
def handle_info({port, {:data, {:eol, line}}}, %{port: port} = state) do
msg = {__MODULE__, {:line, line}}

new_lines =
if length(state.output_lines) < @max_buffered_lines do
[msg | state.output_lines]
else
[msg | Enum.take(state.output_lines, @max_buffered_lines - 1)]
end

if state.subscriber, do: send(state.subscriber, msg)
{:noreply, %{state | output_lines: new_lines}}
{:noreply, buffer_output(state, msg)}
end

# noeol partial chunk — forward but do not buffer
Expand All @@ -194,17 +198,15 @@ defmodule Reencodarr.AbAv1.CrfSearcher do
def handle_info({port, {:exit_status, code}}, %{port: port} = state) do
Logger.info("CrfSearcher: port exited with status #{code}")
msg = {__MODULE__, {:exit_status, code}}
if state.subscriber, do: send(state.subscriber, msg)
{:stop, :normal, state}
handle_port_exit_message(state, msg)
end

# Port closed / died without exit_status
@impl true
def handle_info({:EXIT, port, reason}, %{port: port} = state) do
Logger.error("CrfSearcher: port died unexpectedly: #{inspect(reason)}")
msg = {__MODULE__, {:exit_status, {:port_died, reason}}}
if state.subscriber, do: send(state.subscriber, msg)
{:stop, :normal, state}
handle_port_exit_message(state, msg)
end

# Normal port EXIT after exit_status already handled — ignore
Expand All @@ -213,6 +215,25 @@ defmodule Reencodarr.AbAv1.CrfSearcher do
{:noreply, state}
end

@impl true
def handle_info(:stop_completed_without_subscriber, %{subscriber: nil} = state) do
{:stop, :normal, state}
end

def handle_info(:stop_completed_without_subscriber, state) do
{:noreply, state}
end

@impl true
def handle_info(:deliver_pending_exit_status, %{pending_exit_status: nil} = state) do
{:noreply, state}
end

def handle_info(:deliver_pending_exit_status, state) do
if state.subscriber, do: send(state.subscriber, state.pending_exit_status)
{:stop, :normal, %{state | pending_exit_status: nil}}
end

@impl true
def handle_info(msg, state) do
Logger.warning("CrfSearcher: unexpected message: #{inspect(msg)}")
Expand All @@ -234,4 +255,30 @@ defmodule Reencodarr.AbAv1.CrfSearcher do
Helper.close_port(state.port)
:ok
end

defp handle_port_exit_message(state, msg) do
if state.subscriber do
send(state.subscriber, msg)
{:stop, :normal, state}
else
Process.send_after(
self(),
:stop_completed_without_subscriber,
@completed_without_subscriber_ttl_ms
)

{:noreply, %{state | port: :none, os_pid: nil, pending_exit_status: msg}}
end
end

defp buffer_output(state, msg) do
new_lines =
if length(state.output_lines) < @max_buffered_lines do
[msg | state.output_lines]
else
[msg | Enum.take(state.output_lines, @max_buffered_lines - 1)]
end

%{state | output_lines: new_lines}
end
end
3 changes: 1 addition & 2 deletions lib/reencodarr/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,11 @@ defmodule Reencodarr.Application do
base_children = [
ReencodarrWeb.Telemetry,
Reencodarr.Repo,
Reencodarr.DbWriter,
{DNSCluster, query: Application.get_env(:reencodarr, :dns_cluster_query) || :ignore},
{Phoenix.PubSub, name: Reencodarr.PubSub},
# Start the Finch HTTP client for sending emails
{Finch, name: Reencodarr.Finch},
# Webhook processor GenServer - queues webhook tasks to prevent SQLite lock contention
ReencodarrWeb.WebhookProcessor,
# Start to serve requests, typically the last entry
ReencodarrWeb.Endpoint,
# DynamicSupervisor for port-holder processes (AbAv1.Encoder, AbAv1.CrfSearcher).
Expand Down
2 changes: 1 addition & 1 deletion lib/reencodarr/dashboard/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ defmodule Reencodarr.Dashboard.State do

@queue_refresh_interval 5_000
@chart_refresh_interval 300_000
@default_queue_query_timeout 5_000
@default_queue_query_timeout 1_000
@progress_debounce_ms 500
@tracked_video_states [
:needs_analysis,
Expand Down
137 changes: 137 additions & 0 deletions lib/reencodarr/db_writer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
defmodule Reencodarr.DbWriter do
@moduledoc """
Serializes runtime SQLite mutations through a single in-process writer.

Reads stay concurrent on the normal repo pool. Runtime writes should route
through this module so only one process is mutating SQLite at a time.
"""

use GenServer
require Logger

alias Reencodarr.Core.Retry
alias Reencodarr.Repo

@inline_envs [:test]
@default_call_timeout 60_000
@default_max_attempts 3
@default_backoff_ms 25

def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end

@spec run((-> result), keyword()) :: result when result: var
def run(fun, opts \\ []) when is_function(fun, 0) do
if inline_mode?(opts) or in_writer?() do
execute_inline(fun, opts)
else
__MODULE__
|> GenServer.call(
{:run, fun, opts},
Keyword.get(opts, :writer_timeout, @default_call_timeout)
)
|> unwrap_result()
end
end

@spec transaction((-> result), keyword()) :: {:ok, result} | {:error, term()} when result: var
def transaction(fun, opts \\ []) when is_function(fun, 0) do
run(fn -> Repo.transaction(fun) end, opts)
end

@spec enqueue((-> any()), keyword()) :: :ok
def enqueue(fun, opts \\ []) when is_function(fun, 0) do
if inline_mode?(opts) or in_writer?() do
execute_enqueue(fun, opts)
:ok
else
GenServer.cast(__MODULE__, {:enqueue, fun, opts})
end
Comment thread
mjc marked this conversation as resolved.
end

@spec in_writer?() :: boolean()
def in_writer?, do: Process.get(:db_writer_active, false) == true

@impl true
def init(_opts) do
{:ok, :ok}
end

@impl true
def handle_call({:run, fun, opts}, _from, state) do
{:reply, execute(fun, opts), state}
end

@impl true
def handle_cast({:enqueue, fun, opts}, state) do
execute_enqueue(fun, opts)

{:noreply, state}
end
Comment thread
mjc marked this conversation as resolved.

defp inline_mode?(opts) do
Keyword.get(opts, :inline?, default_inline_mode?())
end

defp default_inline_mode? do
Application.get_env(:reencodarr, :env) in @inline_envs
end

defp execute_inline(fun, opts) do
execute(fun, opts)
|> unwrap_result()
end

defp execute_enqueue(fun, opts) do
case execute(fun, opts) do
{:ok, _result} ->
:ok

{:raised, kind, reason, stacktrace} ->
log_async_failure(kind, reason, stacktrace, opts)
end
end

defp execute(fun, opts) do
retry_opts = [
max_attempts: Keyword.get(opts, :max_attempts, @default_max_attempts),
base_backoff_ms: Keyword.get(opts, :base_backoff_ms, @default_backoff_ms),
label: Keyword.get(opts, :label)
]

previous = Process.get(:db_writer_active)
Process.put(:db_writer_active, true)

try do
{:ok, Retry.retry_on_db_busy(fun, retry_opts)}
rescue
error ->
{:raised, :error, error, __STACKTRACE__}
catch
kind, reason ->
{:raised, kind, reason, __STACKTRACE__}
after
if previous == nil do
Process.delete(:db_writer_active)
else
Process.put(:db_writer_active, previous)
end
end
end

defp unwrap_result({:ok, value}), do: value

defp unwrap_result({:raised, kind, reason, stacktrace}) do
:erlang.raise(kind, reason, stacktrace)
end

defp log_async_failure(kind, reason, stacktrace, opts) do
label = Keyword.get(opts, :label, :db_writer)

Logger.error("""
DbWriter async task failed for #{inspect(label)}
#{Exception.format(kind, reason, stacktrace)}
""")
end
end
Loading
Loading