Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/prod.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
import Config
4 changes: 3 additions & 1 deletion config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ config :dataloader, Dataloader.TestRepo,
database: "dataloader_test",
pool: Ecto.Adapters.SQL.Sandbox

config :dataloader, ecto_repos: [Dataloader.TestRepo]
config :dataloader,
ecto_repos: [Dataloader.TestRepo],
source_mock: Dataloader.TestSource.MockSource

config :logger, level: :warning
60 changes: 48 additions & 12 deletions lib/dataloader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,13 @@ defmodule Dataloader do
options: [option]
}

@type option :: {:timeout, pos_integer} | {:get_policy, atom()}
@type option ::
{:timeout, pos_integer}
| {:get_policy, atom()}
| {:timeout_margin, non_neg_integer}
@type source_name :: any

@default_timeout_margin 1_000
@default_timeout 15_000
def default_timeout, do: @default_timeout

Expand All @@ -108,7 +112,7 @@ defmodule Dataloader do
@spec new([option]) :: t
def new(opts \\ []) do
opts =
[get_policy: @default_get_policy]
[get_policy: @default_get_policy, timeout_margin: @default_timeout_margin]
|> Keyword.merge(opts)

%__MODULE__{options: opts}
Expand All @@ -135,10 +139,25 @@ defmodule Dataloader do
load_many(loader, source_name, batch_key, [val])
end

defp do_load({:error, error}, _, _), do: {:error, error}

defp do_load(source, batch_key, vals) do
Enum.reduce(vals, source, &Source.load(&2, batch_key, &1))
end

@doc """
A note on error handling.
If a Source.run/1 call returns an error or times out, the source is considered failed and
irrecoverable. This is motivated by a two points:
* There is no way to clear the failing batches from a Source (currently), any future Source.run/1 will
retry the same batches with potential additional batches. If the issue causing a timeout is not
resolved between runs it will incur severe delays to full response.
* If future Source.run/1 on failed Sources would be allowed, it is not possible to distinguish
between failures of future batch/key and the previous Source error without keeping track of
which batch was part of which run. Not distinguishing between different errors could be
very detrimental during debugging.
The above points could be addressed by designing a different protocol.
"""
@spec run(t) :: t
def run(dataloader) do
if pending_batches?(dataloader) do
Expand All @@ -149,8 +168,12 @@ defmodule Dataloader do

emit_start_event(id, system_time, dataloader)

{async_sources, sync_sources} =
{error_sources, ok_sources} =
dataloader.sources
|> Enum.split_with(&match?({_name, {:error, _}}, &1))

{async_sources, sync_sources} =
ok_sources
|> Enum.split_with(fn {_name, source} -> Dataloader.Source.async?(source) end)

async_source_results =
Expand Down Expand Up @@ -187,8 +210,9 @@ defmodule Dataloader do
|> Stream.concat(sync_source_results)
|> Stream.map(fn
{_source, {:ok, {name, source}}} -> {name, source}
{_source, {:error, reason}} -> {:error, reason}
{{name, _}, {:error, reason}} -> {name, {:error, reason}}
end)
|> Stream.concat(error_sources)
|> Map.new()

updated_dataloader = %{dataloader | sources: sources}
Expand Down Expand Up @@ -220,18 +244,18 @@ defmodule Dataloader do
defp dataloader_timeout(dataloader) do
max_source_timeout =
dataloader.sources
|> Enum.map(fn {_, source} -> Source.timeout(source) end)
|> Enum.map(fn {_, source} -> do_timeout(source) end)
|> Enum.reject(&is_nil/1)
|> Enum.max(fn -> @default_timeout end)

max_source_timeout + :timer.seconds(1)
max_source_timeout + Keyword.get(dataloader.options, :timeout_margin, @default_timeout_margin)
end

@spec get(t, source_name, any, any) :: any
def get(loader = %Dataloader{options: options}, source, batch_key, item_key) do
loader
|> get_source(source)
|> Source.fetch(batch_key, item_key)
|> do_fetch(batch_key, item_key)
|> do_get(options[:get_policy])
end

Expand All @@ -242,11 +266,14 @@ defmodule Dataloader do

for key <- item_keys do
source
|> Source.fetch(batch_key, key)
|> do_fetch(batch_key, key)
|> do_get(options[:get_policy])
end
end

defp do_fetch({:error, reason}, _, _), do: {:error, reason}
defp do_fetch(source, batch_key, key), do: Source.fetch(source, batch_key, key)

defp do_get({:ok, val}, :raise_on_error), do: val
defp do_get({:ok, val}, :return_nil_on_error), do: val
defp do_get({:ok, val}, :tuples), do: {:ok, val}
Expand All @@ -255,18 +282,27 @@ defmodule Dataloader do
defp do_get({:error, _reason}, :return_nil_on_error), do: nil
defp do_get({:error, reason}, :tuples), do: {:error, reason}

defp do_timeout({:error, _reason}), do: nil
defp do_timeout(source), do: Source.timeout(source)

def put(loader, source_name, batch_key, item_key, result) do
source =
loader
|> get_source(source_name)
|> Source.put(batch_key, item_key, result)
|> do_put(batch_key, item_key, result)

put_in(loader.sources[source_name], source)
end

defp do_put({:error, reason}, _, _, _), do: {:error, reason}
defp do_put(source, batch_key, key, result), do: Source.put(source, batch_key, key, result)

@spec pending_batches?(t) :: boolean
def pending_batches?(loader) do
Enum.any?(loader.sources, fn {_name, source} -> Source.pending_batches?(source) end)
Enum.any?(loader.sources, fn
{_name, {:error, _}} -> false
{_name, source} -> Source.pending_batches?(source)
end)
end

defp get_source(loader, source_name) do
Expand Down Expand Up @@ -394,13 +430,13 @@ defmodule Dataloader do
# Optionally use `async/1` and `async_stream/3` functions from
# `opentelemetry_process_propagator` if available
if Code.ensure_loaded?(OpentelemetryProcessPropagator.Task) do
@spec async((() -> any)) :: Task.t()
@spec async((-> any)) :: Task.t()
defdelegate async(fun), to: OpentelemetryProcessPropagator.Task

@spec async_stream(Enumerable.t(), (term -> term), keyword) :: Enumerable.t()
defdelegate async_stream(items, fun, opts), to: OpentelemetryProcessPropagator.Task
else
@spec async((() -> any)) :: Task.t()
@spec async((-> any)) :: Task.t()
defdelegate async(fun), to: Task

@spec async_stream(Enumerable.t(), (term -> term), keyword) :: Enumerable.t()
Expand Down
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ defmodule Dataloader.Mixfile do
{:opentelemetry_process_propagator, "~> 0.3 or ~> 0.2.1", optional: true},
{:ecto_sql, "~> 3.0", optional: true, only: :test},
{:postgrex, "~> 0.14", only: :test, runtime: false},
{:mox, "~> 1.0", only: :test},
{:dialyxir, "~> 1.3.0", only: [:dev, :test], runtime: false},
{:ex_doc, "~> 0.24", only: :dev, runtime: false},
{:jason, "~> 1.0", only: :test}
Expand Down
2 changes: 2 additions & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
"makeup": {:hex, :makeup, "1.2.1", "e90ac1c65589ef354378def3ba19d401e739ee7ee06fb47f94c687016e3713d1", [:mix], [{:nimble_parsec, "~> 1.4", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "d36484867b0bae0fea568d10131197a4c2e47056a6fbe84922bf6ba71c8d17ce"},
"makeup_elixir": {:hex, :makeup_elixir, "1.0.1", "e928a4f984e795e41e3abd27bfc09f51db16ab8ba1aebdba2b3a575437efafc2", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "7284900d412a3e5cfd97fdaed4f5ed389b8f2b4cb49efc0eb3bd10e2febf9507"},
"makeup_erlang": {:hex, :makeup_erlang, "1.0.1", "c7f58c120b2b5aa5fd80d540a89fdf866ed42f1f3994e4fe189abebeab610839", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "8a89a1eeccc2d798d6ea15496a6e4870b75e014d1af514b1b71fa33134f57814"},
"mox": {:hex, :mox, "1.2.0", "a2cd96b4b80a3883e3100a221e8adc1b98e4c3a332a8fc434c39526babafd5b3", [:mix], [{:nimble_ownership, "~> 1.0", [hex: :nimble_ownership, repo: "hexpm", optional: false]}], "hexpm", "c7b92b3cc69ee24a7eeeaf944cd7be22013c52fcb580c1f33f50845ec821089a"},
"nimble_ownership": {:hex, :nimble_ownership, "1.0.2", "fa8a6f2d8c592ad4d79b2ca617473c6aefd5869abfa02563a77682038bf916cf", [:mix], [], "hexpm", "098af64e1f6f8609c6672127cfe9e9590a5d3fcdd82bc17a377b8692fd81a879"},
"nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"},
"opentelemetry_api": {:hex, :opentelemetry_api, "1.4.0", "63ca1742f92f00059298f478048dfb826f4b20d49534493d6919a0db39b6db04", [:mix, :rebar3], [], "hexpm", "3dfbbfaa2c2ed3121c5c483162836c4f9027def469c41578af5ef32589fcfc58"},
"opentelemetry_process_propagator": {:hex, :opentelemetry_process_propagator, "0.3.0", "ef5b2059403a1e2b2d2c65914e6962e56371570b8c3ab5323d7a8d3444fb7f84", [:mix, :rebar3], [{:opentelemetry_api, "~> 1.0", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}], "hexpm", "7243cb6de1523c473cba5b1aefa3f85e1ff8cc75d08f367104c1e11919c8c029"},
Expand Down
120 changes: 120 additions & 0 deletions test/dataloader_test.exs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
defmodule DataloaderTest do
use ExUnit.Case, async: false
import ExUnit.CaptureLog
import Mox

doctest Dataloader

Expand Down Expand Up @@ -37,6 +38,8 @@ defmodule DataloaderTest do
{key, item}
end

setup :verify_on_exit!

setup do
loader =
Dataloader.new()
Expand Down Expand Up @@ -104,6 +107,69 @@ defmodule DataloaderTest do
end
end

describe "run/1" do
test "returns error" do
Dataloader.TestSource.MockSource
# lowest possible timeout
|> stub(:timeout, fn _ -> 1 end)
# false would skip invoking Source.run/1
|> stub(:pending_batches?, fn _ -> true end)
|> stub(:async?, fn _ -> true end)
# Dataloader adds one second to every timeout, to trigger timeout we
# need to hold longer than <timeout> + 1s
|> expect(:run, 1, fn _ -> {:error, :test_error} end)

loader =
Dataloader.new(get_policy: :tuples)
|> Dataloader.add_source(:test, %Dataloader.TestSource.SourceImpl{})
|> Dataloader.run()

# Dataloader replaces the source struct with error tuple. There is
# no reasonable recovery from Source.run/1 errors.
assert %{sources: %{test: {:error, :test_error}}} = loader
end

test "exceeds timeout" do
Dataloader.TestSource.MockSource
# lowest possible timeout
|> stub(:timeout, fn _ -> 1 end)
# false would skip invoking Source.run/1
|> stub(:pending_batches?, fn _ -> true end)
|> stub(:async?, fn _ -> true end)
|> expect(:run, fn %{name: :test} -> Process.sleep(2) end)

loader =
Dataloader.new(get_policy: :tuples, timeout_margin: 0)
|> Dataloader.add_source(:test, %Dataloader.TestSource.SourceImpl{name: :test})
|> Dataloader.run()

# Dataloader replaces the source struct with error tuple. There is
# no reasonable recovery from timeout.
assert %{sources: %{test: {:error, :timeout}}} = loader
end

test "use highest timeout plus margin as timeout for all tasks" do
Dataloader.TestSource.MockSource
|> expect(:timeout, 4, fn %{timeout: t} -> t end)
# pending_batches? is only checked for any?
|> stub(:pending_batches?, fn _ -> true end)
|> stub(:async?, fn _ -> true end)
# Sleep for 2ms (not triggering timeout) or 11ms (triggering timeout)
|> expect(:run, 2, fn s ->
Process.sleep(s.timeout + 2)
s
end)

loader =
Dataloader.new(get_policy: :tuples, timeout_margin: 1)
|> Dataloader.add_source(:test_1, %Dataloader.TestSource.SourceImpl{timeout: 1})
|> Dataloader.add_source(:test_2, %Dataloader.TestSource.SourceImpl{timeout: 5})
|> Dataloader.run()

assert %{sources: %{test_1: %{}, test_2: {:error, :timeout}}} = loader
end
end

describe "get methods when configured to raise an error" do
test "get/4 returns a value when successful", %{loader: loader} do
result =
Expand Down Expand Up @@ -187,6 +253,28 @@ defmodule DataloaderTest do

assert log =~ "hell"
end

test "get/4 raises an exception when there was an error running the source batches" do
loader =
Dataloader.new(get_policy: :raise_on_error)
|> Dataloader.add_source(:test, {:error, :test_error})

assert_raise Dataloader.GetError, ":test_error", fn ->
loader
|> Dataloader.get(:test, :foo, "foo")
end
end

test "get_many/4 raises an exception when there was an error running the source batches" do
loader =
Dataloader.new(get_policy: :raise_on_error)
|> Dataloader.add_source(:test, {:error, :test_error})

assert_raise Dataloader.GetError, ":test_error", fn ->
loader
|> Dataloader.get_many(:test, :foo, ["foo"])
end
end
end

describe "get methods when configured to return `nil` on error" do
Expand Down Expand Up @@ -249,6 +337,22 @@ defmodule DataloaderTest do

assert log =~ "hell"
end

test "get/4 return `nil` when there was an error running the source batches" do
loader =
Dataloader.new(get_policy: :tuples)
|> Dataloader.add_source(:test, {:error, :test_error})

assert {:error, :test_error} == loader |> Dataloader.get(:test, :foo, "foo")
end

test "get_many/4 return `nil` when there was an error running the source batches" do
loader =
Dataloader.new(get_policy: :return_nil_on_error)
|> Dataloader.add_source(:test, {:error, :test_error})

assert [nil] == loader |> Dataloader.get_many(:test, :foo, ["foo"])
end
end

describe "get methods when configured to return ok/error tuples" do
Expand Down Expand Up @@ -286,6 +390,14 @@ defmodule DataloaderTest do
assert result == {:error, :value}
end

test "get/4 returns an {:error, reason} tuple when there was an error running the source batches" do
loader =
Dataloader.new(get_policy: :tuples)
|> Dataloader.add_source(:test, {:error, :test_error})

assert {:error, :test_error} == loader |> Dataloader.get(:test, :foo, "foo")
end

test "get_many/4 returns a list of {:ok, value} tuples when successful", %{loader: loader} do
result =
loader
Expand Down Expand Up @@ -334,5 +446,13 @@ defmodule DataloaderTest do

assert log =~ "hell"
end

test "get_many/4 returns a list of {:error, reason} tuples when there was an error running the source batches" do
loader =
Dataloader.new(get_policy: :tuples)
|> Dataloader.add_source(:test, {:error, :test_error})

assert [{:error, :test_error}] == loader |> Dataloader.get_many(:test, :foo, ["foo"])
end
end
end
Loading