|
| 1 | +defmodule HRW.Bounded do |
| 2 | + @moduledoc """ |
| 3 | + A bounded-load variant of HRW. Distributes a known set of keys across nodes |
| 4 | + such that no node receives more than `ceil(|keys| / |nodes| × (1 + epsilon))` |
| 5 | + keys. |
| 6 | +
|
| 7 | + Pure function of `(keys, nodes, opts)` — any two callers with the same |
| 8 | + inputs produce the same assignment, no coordination required. Use this |
| 9 | + when the full key set is known at compute time and you want bounded skew. |
| 10 | + """ |
| 11 | + |
| 12 | + @doc """ |
| 13 | + Returns a map of `key => node` covering every input key. Returns `%{}` |
| 14 | + when `keys` is empty. |
| 15 | +
|
| 16 | + Each key is assigned to its highest-scoring node, with overflow falling |
| 17 | + through to the next-best when a node hits the cap of |
| 18 | + `ceil(|keys| / |nodes| × (1 + epsilon))`. |
| 19 | +
|
| 20 | + ## Options |
| 21 | +
|
| 22 | + * `:epsilon` - load slack factor. Smaller values give tighter balance but |
| 23 | + more movement on node churn. Defaults to `0.25`. |
| 24 | + * `:hash_fn` - a function `term -> integer`. Defaults to `&:erlang.phash2/1`. |
| 25 | +
|
| 26 | + ## Examples |
| 27 | +
|
| 28 | + iex> HRW.Bounded.assignments(["a", "b", "c", "d"], ["x", "y"], epsilon: 0.0) |
| 29 | + %{"a" => "x", "b" => "x", "c" => "y", "d" => "y"} |
| 30 | + """ |
| 31 | + @spec assignments([term()], [term()], keyword()) :: %{term() => term()} |
| 32 | + def assignments(keys, nodes, opts \\ []) |
| 33 | + |
| 34 | + def assignments(_keys, [], _opts) do |
| 35 | + raise ArgumentError, "HRW.Bounded.assignments/3 requires a non-empty list of nodes" |
| 36 | + end |
| 37 | + |
| 38 | + def assignments(keys, nodes, opts) do |
| 39 | + hash_fn = Keyword.get(opts, :hash_fn, &:erlang.phash2/1) |
| 40 | + epsilon = Keyword.get(opts, :epsilon, 0.25) |
| 41 | + |
| 42 | + if epsilon < 0 do |
| 43 | + raise ArgumentError, |
| 44 | + "HRW.Bounded.assignments/3 requires :epsilon >= 0, got: #{inspect(epsilon)}" |
| 45 | + end |
| 46 | + |
| 47 | + keys = |
| 48 | + keys |
| 49 | + |> Enum.uniq() |
| 50 | + |> Enum.sort() |
| 51 | + |
| 52 | + nodes = |
| 53 | + nodes |
| 54 | + |> Enum.uniq() |
| 55 | + |> Enum.sort() |
| 56 | + |
| 57 | + cap = ceil(length(keys) / length(nodes) * (1 + epsilon)) |
| 58 | + |
| 59 | + {results, _} = |
| 60 | + Enum.reduce(keys, {%{}, %{}}, fn k, {out, load} -> |
| 61 | + node = |
| 62 | + nodes |
| 63 | + |> Enum.filter(fn n -> Map.get(load, n, 0) < cap end) |
| 64 | + |> Enum.max_by(fn n -> hash_fn.({k, n}) end) |
| 65 | + |
| 66 | + {Map.put(out, k, node), Map.update(load, node, 1, &(&1 + 1))} |
| 67 | + end) |
| 68 | + |
| 69 | + results |
| 70 | + end |
| 71 | +end |
0 commit comments