Skip to content

Commit 910215c

Browse files
committed
Address follow-up Copilot review comments
1 parent 1a4b86e commit 910215c

4 files changed

Lines changed: 107 additions & 53 deletions

File tree

lib/reencodarr/media/video_queries.ex

Lines changed: 35 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ defmodule Reencodarr.Media.VideoQueries do
88

99
import Ecto.Query
1010
alias Reencodarr.{DbWriter, Media.Video, Media.Vmaf, Repo}
11+
require Logger
1112

1213
@doc """
1314
Gets videos ready for CRF search (state: analyzed).
@@ -103,34 +104,41 @@ defmodule Reencodarr.Media.VideoQueries do
103104
"""
104105
@spec claim_videos_for_analysis(integer(), keyword()) :: [Video.t()]
105106
def claim_videos_for_analysis(limit, opts \\ []) do
106-
DbWriter.transaction(fn ->
107-
candidates =
108-
from(v in Video,
109-
where: v.state == :needs_analysis,
110-
order_by: [desc: v.priority, desc: v.size, desc: v.inserted_at],
111-
limit: ^limit,
112-
select: v.id
113-
)
114-
|> Repo.all(opts)
115-
116-
case candidates do
117-
[] ->
118-
[]
119-
120-
ids ->
121-
{_count, claimed} =
122-
from(v in Video,
123-
where: v.id in ^ids and v.state == :needs_analysis,
124-
select: v
125-
)
126-
|> Repo.update_all([set: [state: :analyzing, updated_at: DateTime.utc_now()]], opts)
127-
128-
claimed
129-
end
130-
end)
107+
DbWriter.transaction(
108+
fn ->
109+
candidates =
110+
from(v in Video,
111+
where: v.state == :needs_analysis,
112+
order_by: [desc: v.priority, desc: v.size, desc: v.inserted_at],
113+
limit: ^limit,
114+
select: v.id
115+
)
116+
|> Repo.all(opts)
117+
118+
case candidates do
119+
[] ->
120+
[]
121+
122+
ids ->
123+
{_count, claimed} =
124+
from(v in Video,
125+
where: v.id in ^ids and v.state == :needs_analysis,
126+
select: v
127+
)
128+
|> Repo.update_all([set: [state: :analyzing, updated_at: DateTime.utc_now()]], opts)
129+
130+
claimed
131+
end
132+
end,
133+
label: :video_queries_claim_videos_for_analysis
134+
)
131135
|> case do
132-
{:ok, claimed} -> claimed
133-
{:error, _reason} -> []
136+
{:ok, claimed} ->
137+
claimed
138+
139+
{:error, reason} ->
140+
Logger.warning("Failed to claim videos for analysis: #{inspect(reason)}")
141+
[]
134142
end
135143
end
136144

lib/reencodarr/media/video_upsert.ex

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,22 @@ defmodule Reencodarr.Media.VideoUpsert do
2121
"""
2222
@spec upsert(attrs()) :: upsert_result()
2323
def upsert(attrs) do
24-
DbWriter.run(fn -> do_upsert(attrs) end, label: "perform video upsert")
24+
DbWriter.transaction(
25+
fn ->
26+
case do_upsert(attrs) do
27+
{:ok, video} ->
28+
video
29+
30+
{:error, reason} ->
31+
Repo.rollback(reason)
32+
end
33+
end,
34+
label: "perform video upsert"
35+
)
36+
|> case do
37+
{:ok, video} -> {:ok, video}
38+
{:error, reason} -> {:error, reason}
39+
end
2540
end
2641

2742
@doc """
@@ -30,12 +45,21 @@ defmodule Reencodarr.Media.VideoUpsert do
3045
"""
3146
@spec batch_upsert([attrs()]) :: [upsert_result()]
3247
def batch_upsert(video_attrs_list) when is_list(video_attrs_list) do
48+
batch_upsert(video_attrs_list, [])
49+
end
50+
51+
@spec batch_upsert([attrs()], keyword()) :: [upsert_result()]
52+
def batch_upsert(video_attrs_list, opts) when is_list(video_attrs_list) and is_list(opts) do
53+
writer_opts =
54+
[label: :video_batch_upsert, max_attempts: 1]
55+
|> Keyword.merge(opts)
56+
3357
DbWriter.run(
3458
fn ->
3559
Logger.debug("VideoUpsert batch processing #{length(video_attrs_list)} videos")
3660
Enum.map(video_attrs_list, &process_single_video_in_batch/1)
3761
end,
38-
label: :video_batch_upsert
62+
writer_opts
3963
)
4064
end
4165

lib/reencodarr/sync.ex

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ defmodule Reencodarr.Sync do
66
alias Reencodarr.Analyzer.Broadway, as: AnalyzerBroadway
77
alias Reencodarr.Core.Parsers
88
alias Reencodarr.Dashboard.Events
9-
alias Reencodarr.{Media, Repo, Services}
9+
alias Reencodarr.{DbWriter, Media, Repo, Services}
1010

1111
alias Reencodarr.Media.{MediaInfoExtractor, VideoFileInfo, VideoUpsert}
1212
alias Reencodarr.Media.Video.MediaInfoConverter
@@ -335,8 +335,13 @@ defmodule Reencodarr.Sync do
335335
# Check if video exists and file size hasn't changed
336336
existing_video = Media.get_video_by_path(info.path)
337337

338-
# VideoUpsert will automatically set state to needs_analysis for zero bitrate
339-
handle_video_upsert(existing_video, info)
338+
DbWriter.transaction(
339+
fn ->
340+
# VideoUpsert will automatically set state to needs_analysis for zero bitrate
341+
handle_video_upsert(existing_video, info)
342+
end,
343+
label: :sync_upsert_video_from_file
344+
)
340345
end
341346

342347
defp handle_video_upsert({:ok, video}, info) do
@@ -419,17 +424,22 @@ defmodule Reencodarr.Sync do
419424
# Convert directly to MediaInfo format
420425
mediainfo = MediaInfoConverter.from_service_file(file, service_type)
421426

422-
# Store in database
423-
# VideoUpsert will automatically set state to needs_analysis for missing bitrate
424-
VideoUpsert.upsert(%{
425-
"path" => file["path"],
426-
"size" => file["size"],
427-
"service_id" => to_string(file["id"]),
428-
"service_type" => to_string(service_type),
429-
"mediainfo" => mediainfo,
430-
"bitrate" => file["overallBitrate"] || 0,
431-
"dateAdded" => file["dateAdded"]
432-
})
427+
DbWriter.transaction(
428+
fn ->
429+
# Store in database
430+
# VideoUpsert will automatically set state to needs_analysis for missing bitrate
431+
VideoUpsert.upsert(%{
432+
"path" => file["path"],
433+
"size" => file["size"],
434+
"service_id" => to_string(file["id"]),
435+
"service_type" => to_string(service_type),
436+
"mediainfo" => mediainfo,
437+
"bitrate" => file["overallBitrate"] || 0,
438+
"dateAdded" => file["dateAdded"]
439+
})
440+
end,
441+
label: :sync_upsert_video_from_service_file
442+
)
433443
end
434444

435445
defp build_video_file_info(file, _media_info, service_type) do

test/reencodarr/db_writer_test.exs

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -57,27 +57,36 @@ defmodule Reencodarr.DbWriterTest do
5757
previous_env = Application.get_env(:reencodarr, :env)
5858
Application.put_env(:reencodarr, :env, :dev)
5959
on_exit(fn -> Application.put_env(:reencodarr, :env, previous_env) end)
60+
test_pid = self()
6061

6162
task =
6263
Task.async(fn ->
6364
DbWriter.run(
6465
fn ->
65-
Process.sleep(150)
66-
:occupied
66+
send(test_pid, :writer_entered)
67+
68+
receive do
69+
:release_writer -> :occupied
70+
end
6771
end,
6872
writer_timeout: 1_000
6973
)
7074
end)
7175

72-
Process.sleep(25)
76+
assert_receive :writer_entered
77+
78+
available_task =
79+
Task.async(fn ->
80+
DbWriter.run(
81+
fn -> :available end,
82+
timeout: 1,
83+
writer_timeout: 1_000
84+
)
85+
end)
7386

74-
assert :available =
75-
DbWriter.run(
76-
fn -> :available end,
77-
timeout: 1,
78-
writer_timeout: 1_000
79-
)
87+
send(Process.whereis(DbWriter), :release_writer)
8088

89+
assert Task.await(available_task) == :available
8190
assert Task.await(task) == :occupied
8291
end
8392

@@ -96,7 +105,10 @@ defmodule Reencodarr.DbWriterTest do
96105
label: :failing_job
97106
)
98107

99-
Process.sleep(50)
108+
assert :barrier =
109+
DbWriter.run(fn ->
110+
:barrier
111+
end)
100112
end)
101113

102114
assert log =~ "DbWriter async task failed for :failing_job"

0 commit comments

Comments
 (0)