Skip to content

Commit 63ceaba

Browse files
authored
Implements HRW.Weighted (#2)
Using https://www.ietf.org/archive/id/draft-ietf-bess-weighted-hrw-00.html to implement weighted HRW. In practice this means you can now assign a weight to your nodes.
1 parent af15412 commit 63ceaba

9 files changed

Lines changed: 231 additions & 20 deletions

File tree

README.md

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@ The most common library in the Elixir community to use to solve that problem is
1111

1212
For larger node sets, build a skeleton with `HRW.build` and pass it to `HRW.owner` to get O(log n) lookups. The skeleton is plain data — build it once, reuse it across calls.
1313

14-
Additionally, there's `HRW.Bounded` for when you want to control the distribution of keys across nodes to limit skew. Consistent hashing and rendezvous hashing algorithms can easily result in uneven distribution for smaller node counts, and `HRW.Bounded` lets you control that, assuming that you have the whole key set up front.
14+
`HRW.owner` and `HRW.build` support an optional `scorer` option for alternative strategies. The available options are `%HRW{}` for the default algorithm, and `%HRW.Weighted{}` for when you want certain nodes to get a larger share of keys.
15+
16+
For additional strategies, there's `HRW.Bounded` for when you want to control the distribution of keys across nodes to limit skew. Consistent hashing and rendezvous hashing algorithms can easily result in uneven distribution for smaller node counts, and `HRW.Bounded` lets you control that, assuming that you have the whole key set up front.
1517

1618
```elixir
1719
# HRW
@@ -28,6 +30,10 @@ skeleton = HRW.build(["server1", "server2", "server3"])
2830
HRW.owner("192.168.0.2", skeleton)
2931
#=> "server3"
3032

33+
# HRW.Weighted
34+
HRW.owner("192.168.0.1", [{"server1", 1}, {"server2", 1}, {"server3", 10}], scorer: %HRW.Weighted{})
35+
#=> "server3"
36+
3137
# HRW.Bounded
3238
HRW.Bounded.assignments(["a", "b", "c", "d"], ["x", "y"], epsilon: 0.0)
3339
#=> %{"a" => "x", "b" => "x", "c" => "y", "d" => "y"}
@@ -39,11 +45,11 @@ tl;dr HRW performs similarly to ExHashRing on smaller node lists, but falls behi
3945

4046
Lookup latency on Apple M4 Pro / Elixir 1.19.5 / OTP 28.5, median per call:
4147

42-
| nodes | HRW.owner | HRW.owner (skeleton) | ExHashRing.find_node |
43-
|-------:|------------:|-------------------:|---------------------:|
44-
| 10 | 292 ns | 292 ns | 333 ns |
45-
| 100 | 2.67 µs | 875 ns | 375 ns |
46-
| 1,000 | 25.54 µs | 1.08 µs | 380 ns |
47-
| 10,000 | 253.58 µs | 1.38 µs | 420 ns |
48+
| nodes | HRW.owner | HRW.owner (weighted) | HRW.owner (skeleton) | HRW.owner (skeleton weighted) | ExHashRing.find_node |
49+
|-------:|----------:|---------------------:|---------------------:|------------------------------:|---------------------:|
50+
| 10 | 542 ns | 1.00 µs | 292 ns | 667 ns | 333 ns |
51+
| 100 | 6.33 µs | 11.00 µs | 920 ns | 1.71 µs | 380 ns |
52+
| 1,000 | 71.79 µs | 121 µs | 1.25 µs | 2.25 µs | 380 ns |
53+
| 10,000 | 771 µs | 1.25 ms | 1.50 µs | 2.92 µs | 420 ns |
4854

4955
Reproduce with `elixir benches/bench.exs`.

benches/bench.exs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,14 @@ defmodule Bench do
1212
nodes = Enum.map(1..n, &"node-#{&1}")
1313
{:ok, ring} = Ring.start_link()
1414
Ring.set_nodes(ring, nodes, :infinity)
15-
%{nodes: nodes, skeleton: HRW.build(nodes), ring: ring}
15+
%{nodes: nodes, skeleton: HRW.build(nodes), ring: ring, weighted_nodes: Enum.map(1..n, &{"node-#{&1}", &1}), weighted_skeleton: HRW.build(Enum.map(1..n, &{"node-#{&1}", &1}), scorer: %HRW.Weighted{})}
1616
end
1717

1818
Benchee.run(%{
1919
"HRW.owner" => fn %{nodes: nodes} -> HRW.owner("test", nodes) end,
20+
"HRW.owner (weighted)" => fn %{weighted_nodes: weighted_nodes} -> HRW.owner("test", weighted_nodes, scorer: %HRW.Weighted{}) end,
2021
"HRW.owner (skeleton)" => fn %{skeleton: skeleton} -> HRW.owner("test", skeleton) end,
22+
"HRW.owner (skeleton weighted)" => fn %{weighted_skeleton: weighted_skeleton} -> HRW.owner("test", weighted_skeleton) end,
2123
"ExHashRing.Ring.find_node" => fn %{ring: ring} -> Ring.find_node(ring, "test") end
2224
}, inputs: %{
2325
"A: 10" => setup.(10),

lib/hrw.ex

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,21 +48,26 @@ defmodule HRW do
4848
def owner(key, nodes_or_skeleton, opts \\ [])
4949

5050
def owner(key, %HRW.Skeleton{} = skeleton, _opts) do
51-
HRW.Skeleton.owner(key, skeleton)
51+
result = HRW.Skeleton.owner(key, skeleton)
52+
unwrap_node(result, skeleton.scorer)
5253
end
5354

5455
def owner(key, nodes, opts) do
56+
validate_nodes(nodes, Keyword.get(opts, :scorer, %HRW{}))
57+
5558
nodes =
5659
nodes
5760
|> Enum.sort()
58-
|> Enum.uniq()
61+
|> Enum.dedup()
5962

6063
if scorer = Keyword.get(opts, :scorer) do
6164
%mod{} = scorer
6265

63-
Enum.max_by(nodes, fn node ->
66+
nodes
67+
|> Enum.max_by(fn node ->
6468
mod.score(scorer, key, node)
6569
end)
70+
|> unwrap_node(scorer)
6671
else
6772
Enum.max_by(nodes, fn node ->
6873
:erlang.phash2({key, node})
@@ -85,17 +90,20 @@ defmodule HRW do
8590
"""
8691
@spec owners(term(), [term()], non_neg_integer(), keyword()) :: [term()]
8792
def owners(key, nodes, count, opts \\ []) do
93+
validate_nodes(nodes, Keyword.get(opts, :scorer, %HRW{}))
94+
8895
nodes =
8996
nodes
9097
|> Enum.sort()
91-
|> Enum.uniq()
98+
|> Enum.dedup()
9299

93100
if scorer = Keyword.get(opts, :scorer) do
94101
%mod{} = scorer
95102

96103
nodes
97104
|> Enum.sort_by(fn node -> mod.score(scorer, key, node) end, :desc)
98105
|> Enum.take(count)
106+
|> Enum.map(&unwrap_node(&1, scorer))
99107
else
100108
nodes
101109
|> Enum.sort_by(fn node -> :erlang.phash2({key, node}) end, :desc)
@@ -121,6 +129,26 @@ defmodule HRW do
121129
"""
122130
@spec build([term()], keyword()) :: HRW.Skeleton.t()
123131
def build(nodes, opts \\ []) do
132+
validate_nodes(nodes, Keyword.get(opts, :scorer, %HRW{}))
124133
HRW.Skeleton.build(nodes, opts)
125134
end
135+
136+
defp unwrap_node({node, _weight}, %HRW.Weighted{}), do: node
137+
defp unwrap_node(node, _scorer), do: node
138+
139+
defp validate_nodes(nodes, %HRW{}) when is_list(nodes), do: nil
140+
defp validate_nodes(_nodes, %HRW{}), do: raise(ArgumentError, "nodes must be a list")
141+
142+
defp validate_nodes(nodes, %HRW.Weighted{}) when is_list(nodes) do
143+
if not Enum.all?(nodes, fn
144+
{_node, weight} when is_number(weight) and weight > 0 -> true
145+
_ -> false
146+
end) do
147+
raise ArgumentError,
148+
"HRW.Weighted requires a list of {node, weight} tuples with positive numeric weights"
149+
end
150+
end
151+
152+
defp validate_nodes(_nodes, %HRW.Weighted{}),
153+
do: raise(ArgumentError, "nodes must be a list of tuples")
126154
end

lib/hrw/bounded.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,13 @@ defmodule HRW.Bounded do
4646

4747
keys =
4848
keys
49-
|> Enum.uniq()
5049
|> Enum.sort()
50+
|> Enum.dedup()
5151

5252
nodes =
5353
nodes
54-
|> Enum.uniq()
5554
|> Enum.sort()
55+
|> Enum.dedup()
5656

5757
cap = ceil(length(keys) / length(nodes) * (1 + epsilon))
5858

lib/hrw/scorer.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@ defmodule HRW.Scorer do
22
@moduledoc """
33
Behaviour for HRW scoring strategies. Each variant module (`HRW`, future
44
`HRW.Weighted`, etc.) defines a struct holding its configuration and
5-
implements `score/3` returning an integer score for a `(key, node)` pair.
5+
implements `score/3` returning a score for a `(key, node)` pair.
66
77
Pass an instance via the `:scorer` option to `HRW.owner/3`, `HRW.owners/4`,
88
or `HRW.build/2`. The highest-scoring node wins.
99
"""
1010

11-
@callback score(scorer :: struct(), key :: term(), node :: term()) :: integer()
11+
@callback score(scorer :: struct(), key :: term(), node :: term()) :: number()
1212
end

lib/hrw/skeleton.ex

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,19 @@ defmodule HRW.Skeleton do
3535
fanout = Keyword.get_lazy(opts, :fanout, fn -> optimal_fanout(count) end)
3636
levels = if count > 1, do: ceil(:math.log(count) / :math.log(fanout)), else: 0
3737

38+
scorer =
39+
case scorer do
40+
%HRW.Weighted{} = weighted ->
41+
%{
42+
weighted
43+
| branch_weights: compute_branch_weights(cluster_list, fanout, levels),
44+
fanout: fanout
45+
}
46+
47+
other ->
48+
other
49+
end
50+
3851
%__MODULE__{
3952
clusters: clusters,
4053
fanout: fanout,
@@ -56,7 +69,9 @@ defmodule HRW.Skeleton do
5669
defp do_owner(key, %__MODULE__{scorer: %HRW{hash_fn: nil}} = skeleton, salt) do
5770
index =
5871
Enum.reduce(0..(skeleton.levels - 1), 0, fn level, acc ->
59-
digit = Enum.max_by(0..(skeleton.fanout - 1), &:erlang.phash2({{key, salt, level}, &1}))
72+
digit =
73+
Enum.max_by(0..(skeleton.fanout - 1), &:erlang.phash2({{key, salt, level, acc}, &1}))
74+
6075
acc * skeleton.fanout + digit
6176
end)
6277

@@ -75,7 +90,9 @@ defmodule HRW.Skeleton do
7590
defp do_owner(key, %__MODULE__{scorer: %mod{} = scorer} = skeleton, salt) do
7691
index =
7792
Enum.reduce(0..(skeleton.levels - 1), 0, fn level, acc ->
78-
digit = Enum.max_by(0..(skeleton.fanout - 1), &mod.score(scorer, {key, salt, level}, &1))
93+
digit =
94+
Enum.max_by(0..(skeleton.fanout - 1), &mod.score(scorer, {key, salt, level, acc}, &1))
95+
7996
acc * skeleton.fanout + digit
8097
end)
8198

@@ -91,8 +108,8 @@ defmodule HRW.Skeleton do
91108
# Deterministic ordering so the same node set always produces the same clusters.
92109
chunks =
93110
nodes
94-
|> Enum.uniq()
95111
|> Enum.sort()
112+
|> Enum.dedup()
96113
|> Enum.chunk_every(size)
97114

98115
# An undersized last chunk would get the same routing probability as full chunks,
@@ -121,6 +138,36 @@ defmodule HRW.Skeleton do
121138
fanout * levels / (1 - overflow_prob)
122139
end)
123140
end
141+
142+
defp compute_branch_weights(_clusters, _fanout, 0), do: {}
143+
144+
defp compute_branch_weights(clusters, fanout, levels) do
145+
cluster_weights =
146+
clusters
147+
|> Enum.with_index()
148+
|> Enum.map(fn {nodes, index} ->
149+
total =
150+
Enum.sum(
151+
Enum.map(nodes, fn
152+
{_node, weight} -> weight
153+
# plain nodes get implicit weight 1
154+
_ -> 1
155+
end)
156+
)
157+
158+
{index, total}
159+
end)
160+
161+
0..(levels - 1)
162+
|> Enum.map(fn level ->
163+
divisor = Integer.pow(fanout, levels - level - 1)
164+
165+
Enum.reduce(cluster_weights, %{}, fn {index, weight}, acc ->
166+
Map.update(acc, div(index, divisor), weight, &(&1 + weight))
167+
end)
168+
end)
169+
|> List.to_tuple()
170+
end
124171
end
125172

126173
defimpl Inspect, for: HRW.Skeleton do

lib/hrw/weighted.ex

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
defmodule HRW.Weighted do
2+
@moduledoc """
3+
Weighted HRW implementation for ensuring certain nodes get a greater share of keys.
4+
5+
Inspired by https://www.ietf.org/archive/id/draft-ietf-bess-weighted-hrw-00.html
6+
7+
Works with both flat lists (`HRW.owner(key, [{"a", 1}, {"b", 4}], scorer: %HRW.Weighted{})`)
8+
and skeletons (`HRW.build([...], scorer: %HRW.Weighted{})`).
9+
"""
10+
11+
@behaviour HRW.Scorer
12+
13+
import Bitwise
14+
15+
@phash2_max 0x7FFFFFF
16+
17+
defstruct [:hash_fn, :branch_weights, :fanout]
18+
19+
@type t :: %__MODULE__{
20+
hash_fn: (term() -> integer()) | nil,
21+
branch_weights: tuple() | nil,
22+
fanout: pos_integer() | nil
23+
}
24+
25+
@impl HRW.Scorer
26+
def score(%__MODULE__{hash_fn: nil}, key, {node, weight}) do
27+
hash = :erlang.phash2({key, node})
28+
leaf_score(hash, weight)
29+
end
30+
31+
def score(%__MODULE__{hash_fn: hash_fn}, key, {node, weight}) do
32+
hash = hash_fn.({key, node})
33+
leaf_score(hash, weight)
34+
end
35+
36+
# Skeleton tree walk: key is {real_key, salt, level, prefix},
37+
# entry is the candidate digit. Looks up branch weight from precomputed tree.
38+
def score(
39+
%__MODULE__{branch_weights: branch_weights, fanout: fanout},
40+
{key, salt, level, prefix},
41+
digit
42+
) do
43+
candidate = prefix * fanout + digit
44+
45+
branch_weight =
46+
branch_weights
47+
|> elem(level)
48+
|> Map.get(candidate, 0)
49+
50+
if branch_weight == 0 do
51+
# Empty branch (no nodes under this prefix). Returning 0.0 ensures it never wins.
52+
0.0
53+
else
54+
normalized_hash =
55+
(:erlang.phash2({key, salt, level, prefix, digit}) &&& @phash2_max) / @phash2_max
56+
57+
:math.pow(normalized_hash, 1 / branch_weight)
58+
end
59+
end
60+
61+
defp leaf_score(hash, weight) do
62+
normalized = (hash &&& @phash2_max) / @phash2_max
63+
:math.pow(normalized, 1 / weight)
64+
end
65+
end

test/hrw/skeleton_test.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,6 @@ defmodule HRW.SkeletonTest do
55
nodes = Enum.map(1..12, &"server#{&1}")
66
skeleton = HRW.build(nodes, cluster_size: 4)
77

8-
assert HRW.owner("192.168.0.1", skeleton) == "server12"
8+
assert HRW.owner("192.168.0.1", skeleton) == "server9"
99
end
1010
end

test/hrw/weighted_test.exs

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
defmodule HRWWeightedTest do
2+
use ExUnit.Case, async: true
3+
alias HRW.Weighted
4+
5+
test "higher weight nodes win more often in flat lists" do
6+
# 80% of the time, the node with weight 4 wins vs weight 1
7+
weights = [{"heavy", 4}, {"light", 1}]
8+
keys = Enum.map(1..1_000, &"key-#{&1}")
9+
10+
winners =
11+
keys
12+
|> Enum.map(&HRW.owner(&1, weights, scorer: %Weighted{}))
13+
|> Enum.frequencies()
14+
15+
assert winners["heavy"] > winners["light"]
16+
end
17+
18+
test "weight ratio approximates win ratio at scale" do
19+
keys = Enum.map(1..100, &"key-#{&1}")
20+
weights = [{"a", 3}, {"b", 1}]
21+
22+
results =
23+
keys
24+
|> Enum.map(&HRW.owner(&1, weights, scorer: %Weighted{}))
25+
|> Enum.frequencies()
26+
27+
# b wins ~1/4 of the time, a wins ~3/4 (roughly proportional to weight ratio 3:1)
28+
total = results["a"] + results["b"]
29+
ratio = results["a"] / total
30+
31+
# Expect a to win roughly 75% of the time; allow ±15% noise
32+
assert ratio >= 0.60 and ratio <= 0.90
33+
end
34+
35+
test "weighted skeleton routing matches weight ratios" do
36+
nodes = [{"a", 3}, {"b", 2}, {"c", 1}]
37+
skeleton = HRW.build(nodes, scorer: %Weighted{})
38+
39+
keys = Enum.map(1..500, &"key-#{&1}")
40+
41+
results =
42+
keys
43+
|> Enum.map(&HRW.owner(&1, skeleton))
44+
|> Enum.frequencies()
45+
46+
total = results["a"] + results["b"] + results["c"]
47+
a_ratio = results["a"] / total
48+
49+
# alpha has weight 3, total weight 6 → 50% of traffic
50+
assert a_ratio >= 0.40 and a_ratio <= 0.60
51+
end
52+
53+
test "weighted skeleton handles configurations with empty branches" do
54+
nodes = Enum.map(1..100, fn i -> {"node#{i}", 1} end)
55+
sk = HRW.build(nodes, scorer: %HRW.Weighted{}, fanout: 2, cluster_size: 15)
56+
# capacity 8, 6 clusters, 2 empty slots
57+
58+
keys = Enum.map(1..10_000, fn i -> "key-#{i}" end)
59+
results = Enum.map(keys, &HRW.owner(&1, sk))
60+
assert Enum.all?(results, &is_binary/1)
61+
assert length(Enum.uniq(results)) > 90
62+
end
63+
end

0 commit comments

Comments
 (0)