Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 55 additions & 28 deletions backend/lib/edgehog/files/file/changes/handle_file_upload.ex
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,15 @@ defmodule Edgehog.Files.File.Changes.HandleFileUpload do
tasks = Enum.map(uploads, fn {_encoding, task} -> task end)
results = Task.yield_many(tasks, timeout)

{changeset, failed?} =
{changeset, failed?, errors} =
uploads
|> Enum.zip(results)
|> Enum.reduce({changeset, false}, &reduce_results/2)
|> Enum.reduce({changeset, false, []}, &reduce_results/2)

if failed? do
Ash.Changeset.add_error(changeset,
field: :file,
message: "One or more file uploads failed"
)
Enum.reduce(errors, changeset, fn error, acc ->
Ash.Changeset.add_error(acc, error)
end)
else
changeset
end
Expand Down Expand Up @@ -161,36 +160,64 @@ defmodule Edgehog.Files.File.Changes.HandleFileUpload do

defp ceil_div(dividend, divisor), do: div(dividend + divisor - 1, divisor)

defp reduce_results({{_encoding, _task}, {_task_pid, {:ok, result}}}, {acc_changeset, failed?}) do
defp reduce_results(
{{_encoding, _task}, {_pid, {:ok, result}}},
{acc, failed?, errors}
) do
case result do
{:ok, changeset} -> {merge_changesets(acc_changeset, changeset), failed?}
{:error, changeset} -> {merge_changesets(acc_changeset, changeset), true}
{:ok, changeset} ->
{merge_changesets(acc, changeset), failed?, errors}

{:error, changeset} ->
{merge_changesets(acc, changeset), true, acc_errors(errors, changeset)}
end
end

defp reduce_results({{encoding, task}, {_task_pid, nil}}, {acc_changeset, _failed?}) do
Logger.error("Upload task timed out for #{encoding || "base"} encoding")
defp reduce_results(
{{encoding, task}, {_pid, nil}},
{acc, _failed?, errors}
) do
_ = Task.shutdown(task, :brutal_kill)

{Ash.Changeset.add_error(acc_changeset,
field: :file,
message: "Upload process timed out"
), true}
changeset =
Ash.Changeset.add_error(acc,
field: :file,
message: "Upload process timed out (#{encoding || "base"})"
)

{changeset, true, acc_errors(errors, changeset)}
end

defp reduce_results(
{{encoding, _task}, {_task_pid, {:exit, reason}}},
{acc_changeset, _failed?}
{{encoding, _task}, {_pid, {:exit, reason}}},
{acc, _failed?, errors}
) do
Logger.error("Upload task crashed for #{encoding || "base"} encoding: #{inspect(reason)}")
Logger.error("Upload task crashed for #{encoding || "base"}: #{inspect(reason)}")

error = %{
encoding: encoding || :base,
type: :crash,
message: "Upload process crashed",
reason: reason
}

{Ash.Changeset.add_error(acc_changeset, field: :file, message: "Upload process failed"), true}
{acc, true, [error | errors]}
end

defp reduce_results({{encoding, _task}, {_task_result, reason}}, {acc_changeset, _failed?}) do
Logger.error("Upload task failed for #{encoding || "base"} encoding: #{inspect(reason)}")
defp reduce_results(
{{encoding, _task}, {_result, reason}},
{acc, _failed?, errors}
) do
Logger.error("Upload task failed for #{encoding || "base"}: #{inspect(reason)}")

error = %{
encoding: encoding || :base,
type: :error,
message: "Upload process failed",
reason: reason
}

{Ash.Changeset.add_error(acc_changeset, field: :file, message: "Upload process failed"), true}
{acc, true, [error | errors]}
end

defp merge_changesets(acc_changeset, new_changeset) do
Expand All @@ -199,6 +226,10 @@ defmodule Edgehog.Files.File.Changes.HandleFileUpload do
|> Map.put(:context, Map.merge(acc_changeset.context, new_changeset.context))
end

defp acc_errors(errors, changeset) do
changeset.errors ++ errors
end

defp maybe_delete_temporary_upload(%Plug.Upload{path: original_path}, %Plug.Upload{path: path})
when original_path != path do
_ = File.rm(path)
Expand All @@ -210,13 +241,9 @@ defmodule Edgehog.Files.File.Changes.HandleFileUpload do
# If we've uploaded the file and the transaction resulted in an error, we do our
# best to clean up
defp cleanup_on_error(changeset, {:error, _} = result) do
case Ash.Changeset.apply_attributes(changeset) do
{:ok, file_record} ->
do_cleanup(file_record, changeset.context)
file_record = Map.merge(changeset.data, changeset.attributes)

{:error, reason} ->
Logger.warning("Failed to apply attributes for cleanup: #{inspect(reason)}")
end
do_cleanup(file_record, changeset.context)

result
end
Expand Down
Loading
Loading