Skip to content

Commit 1842596

Browse files
cigraingerclaude
andauthored
Distributed perf hardening: 3-12x faster fan-out and merge (#33)
* perf: distributed hardening — ordered:false, adaptive broadcast, pg polling, regex agg Performance: - fan_out uses ordered: false (streaming path already did) — 3.6x faster median for distributed filter+agg (99ms → 27ms) - Broadcast threshold scales with worker count (total network stays constant) Correctness: - Merger uses word-boundary regex for aggregate detection instead of String.contains? — prevents COUNT_DISTINCT matching COUNT branch - FLAME spin_up replaces Process.sleep(100) with :pg.get_members polling loop (10ms interval, 5s timeout) Benchmark results (2 local workers, 100K rows): distributed filter+agg: 99ms → 27ms (3.6x) streaming SUM+COUNT: 491ms → 39ms (12.5x) streaming MIN+MAX: 64ms → 16ms (3.9x) broadcast join: 112ms → 32ms (3.5x) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: use inline regex instead of module attribute for agg detection Regex structs contain references that can't be injected into module attributes on all OTP/Elixir versions. Use inline cond with ~r// sigils instead — they're still compiled at compile time. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent b9e6f42 commit 1842596

5 files changed

Lines changed: 288 additions & 13 deletions

File tree

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
Setting up benchmark data...
2+
Benchmark data ready.
3+
4+
Benchmarking chained from computed (100K): filter → head ...
5+
Benchmarking filter (100K → ~50K) ...
6+
Benchmarking from_list (100 rows) ...
7+
Benchmarking from_parquet (100K rows) ...
8+
Benchmarking from_query (100K rows) ...
9+
Benchmarking from_query (1M rows) ...
10+
Benchmarking full pipeline (100K): filter → mutate → group → summarise → sort ...
11+
Benchmarking group_by + summarise (100K → 100 groups) ...
12+
Benchmarking mutate add column (100K) ...
13+
Benchmarking sort_by (100K rows) ...
14+
Calculating statistics...
15+
Formatting results...
16+
17+
Name ips average deviation median 99th %
18+
chained from computed (100K): filter → head 76.10 13.14 ms ±261.84% 3.46 ms 189.88 ms
19+
full pipeline (100K): filter → mutate → group → summarise â 20.29 49.28 ms ±113.35% 27.75 ms 262.17 ms
20+
filter (100K → ~50K) 14.25 70.16 ms ±152.31% 18.82 ms 521.90 ms
21+
group_by + summarise (100K → 100 groups) 11.98 83.49 ms ±127.61% 34.84 ms 435.70 ms
22+
from_query (100K rows) 11.20 89.25 ms ±142.47% 28.83 ms 594.26 ms
23+
from_list (100 rows) 10.58 94.56 ms ±157.81% 25.42 ms 602.75 ms
24+
mutate add column (100K) 8.02 124.69 ms ±133.44% 59.35 ms 766.17 ms
25+
from_parquet (100K rows) 5.14 194.69 ms ±86.43% 100.68 ms 691.56 ms
26+
sort_by (100K rows) 4.77 209.68 ms ±78.12% 181.60 ms 713.11 ms
27+
from_query (1M rows) 3.97 252.08 ms ±102.32% 148.38 ms 1144.60 ms
28+
29+
Comparison:
30+
chained from computed (100K): filter → head 76.10
31+
full pipeline (100K): filter → mutate → group → summarise â 20.29 - 3.75x slower +36.14 ms
32+
filter (100K → ~50K) 14.25 - 5.34x slower +57.02 ms
33+
group_by + summarise (100K → 100 groups) 11.98 - 6.35x slower +70.35 ms
34+
from_query (100K rows) 11.20 - 6.79x slower +76.11 ms
35+
from_list (100 rows) 10.58 - 7.20x slower +81.41 ms
36+
mutate add column (100K) 8.02 - 9.49x slower +111.55 ms
37+
from_parquet (100K rows) 5.14 - 14.82x slower +181.55 ms
38+
sort_by (100K rows) 4.77 - 15.96x slower +196.54 ms
39+
from_query (1M rows) 3.97 - 19.18x slower +238.94 ms
40+
41+
Memory usage statistics:
42+
43+
Name Memory usage
44+
chained from computed (100K): filter → head 23.25 KB
45+
full pipeline (100K): filter → mutate → group → summarise â 30.11 KB - 1.30x memory usage +6.86 KB
46+
filter (100K → ~50K) 93.99 KB - 4.04x memory usage +70.74 KB
47+
group_by + summarise (100K → 100 groups) 65.86 KB - 2.83x memory usage +42.61 KB
48+
from_query (100K rows) 163.67 KB - 7.04x memory usage +140.42 KB
49+
from_list (100 rows) 454.92 KB - 19.57x memory usage +431.67 KB
50+
mutate add column (100K) 211.96 KB - 9.12x memory usage +188.71 KB
51+
from_parquet (100K rows) 163.49 KB - 7.03x memory usage +140.24 KB
52+
sort_by (100K rows) 162.97 KB - 7.01x memory usage +139.72 KB
53+
from_query (1M rows) 1480.41 KB - 63.67x memory usage +1457.16 KB
54+
55+
**All measurements for memory usage were the same**
56+
57+
--- Distributed benchmark ---
58+
59+
Benchmarking distributed (2 workers): 100K filter + aggregate ...
60+
Benchmarking single-node baseline: 100K filter + aggregate ...
61+
Calculating statistics...
62+
Formatting results...
63+
64+
Name ips average deviation median 99th %
65+
single-node baseline: 100K filter + aggregate 10.42 96.01 ms ±75.19% 70.34 ms 290.98 ms
66+
distributed (2 workers): 100K filter + aggregate 7.58 132.01 ms ±76.60% 99.39 ms 454.95 ms
67+
68+
Comparison:
69+
single-node baseline: 100K filter + aggregate 10.42
70+
distributed (2 workers): 100K filter + aggregate 7.58 - 1.37x slower +36.00 ms
71+
72+
--- Streaming vs batch merge ---
73+
74+
Benchmarking streaming merge (MIN + MAX, 2 workers) ...
75+
Benchmarking streaming merge (SUM + COUNT, 2 workers) ...
76+
Calculating statistics...
77+
Formatting results...
78+
79+
Name ips average deviation median 99th %
80+
streaming merge (MIN + MAX, 2 workers) 9.35 106.98 ms ±102.07% 63.59 ms 430.57 ms
81+
streaming merge (SUM + COUNT, 2 workers) 2.03 492.15 ms ±47.47% 491.47 ms 1038.29 ms
82+
83+
Comparison:
84+
streaming merge (MIN + MAX, 2 workers) 9.35
85+
streaming merge (SUM + COUNT, 2 workers) 2.03 - 4.60x slower +385.17 ms
86+
87+
--- Shuffle join benchmark ---
88+
89+
Benchmarking local join baseline (100K × 100K) ...
90+
Benchmarking shuffle join (2 workers, 100K × 100K) ...
91+
Calculating statistics...
92+
Formatting results...
93+
94+
Name ips average deviation median 99th %
95+
local join baseline (100K × 100K) 0.34 2.95 s ±0.67% 2.95 s 2.96 s
96+
shuffle join (2 workers, 100K × 100K) 0.32 3.12 s ±5.14% 3.12 s 3.23 s
97+
98+
Comparison:
99+
local join baseline (100K × 100K) 0.34
100+
shuffle join (2 workers, 100K × 100K) 0.32 - 1.06x slower +0.170 s
101+
102+
--- Broadcast join (bloom filter) benchmark ---
103+
104+
Benchmarking broadcast join + bloom filter (2 workers, 100K × 20) ...
105+
Benchmarking local join baseline (100K × 20) ...
106+
Calculating statistics...
107+
Formatting results...
108+
109+
Name ips average deviation median 99th %
110+
local join baseline (100K × 20) 14.11 70.89 ms ±82.81% 50.03 ms 265.97 ms
111+
broadcast join + bloom filter (2 workers, 100K × 20) 7.17 139.55 ms ±67.37% 111.71 ms 402.44 ms
112+
113+
Comparison:
114+
local join baseline (100K × 20) 14.11
115+
broadcast join + bloom filter (2 workers, 100K × 20) 7.17 - 1.97x slower +68.66 ms
116+
117+
Benchmarks complete.

bench/results/post_hardening.txt

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
Setting up benchmark data...
2+
Benchmark data ready.
3+
4+
Benchmarking chained from computed (100K): filter → head ...
5+
Benchmarking filter (100K → ~50K) ...
6+
Benchmarking from_list (100 rows) ...
7+
Benchmarking from_parquet (100K rows) ...
8+
Benchmarking from_query (100K rows) ...
9+
Benchmarking from_query (1M rows) ...
10+
Benchmarking full pipeline (100K): filter → mutate → group → summarise → sort ...
11+
Benchmarking group_by + summarise (100K → 100 groups) ...
12+
Benchmarking mutate add column (100K) ...
13+
Benchmarking sort_by (100K rows) ...
14+
Calculating statistics...
15+
Formatting results...
16+
17+
Name ips average deviation median 99th %
18+
chained from computed (100K): filter → head 86.34 11.58 ms ±236.69% 3.70 ms 177.37 ms
19+
full pipeline (100K): filter → mutate → group → summarise â 26.76 37.37 ms ±156.94% 13.46 ms 316.26 ms
20+
mutate add column (100K) 22.25 44.94 ms ±180.00% 17.04 ms 539.45 ms
21+
filter (100K → ~50K) 17.99 55.60 ms ±101.59% 36.60 ms 296.43 ms
22+
group_by + summarise (100K → 100 groups) 15.31 65.30 ms ±145.49% 18.29 ms 386.54 ms
23+
from_parquet (100K rows) 13.23 75.58 ms ±134.61% 35.33 ms 629.73 ms
24+
sort_by (100K rows) 13.17 75.93 ms ±151.87% 28.01 ms 604.86 ms
25+
from_query (100K rows) 8.38 119.39 ms ±114.08% 56.41 ms 576.08 ms
26+
from_list (100 rows) 7.69 130.01 ms ±125.80% 61.13 ms 826.04 ms
27+
from_query (1M rows) 2.80 356.94 ms ±93.87% 256.26 ms 1163.06 ms
28+
29+
Comparison:
30+
chained from computed (100K): filter → head 86.34
31+
full pipeline (100K): filter → mutate → group → summarise â 26.76 - 3.23x slower +25.79 ms
32+
mutate add column (100K) 22.25 - 3.88x slower +33.36 ms
33+
filter (100K → ~50K) 17.99 - 4.80x slower +44.02 ms
34+
group_by + summarise (100K → 100 groups) 15.31 - 5.64x slower +53.72 ms
35+
from_parquet (100K rows) 13.23 - 6.53x slower +63.99 ms
36+
sort_by (100K rows) 13.17 - 6.56x slower +64.34 ms
37+
from_query (100K rows) 8.38 - 10.31x slower +107.81 ms
38+
from_list (100 rows) 7.69 - 11.23x slower +118.43 ms
39+
from_query (1M rows) 2.80 - 30.82x slower +345.36 ms
40+
41+
Memory usage statistics:
42+
43+
Name average deviation median 99th %
44+
chained from computed (100K): filter → head 23.25 KB ±0.00% 23.25 KB 23.25 KB
45+
full pipeline (100K): filter → mutate → group → summarise → sort 30.11 KB ±0.00% 30.11 KB 30.11 KB
46+
mutate add column (100K) 211.96 KB ±0.00% 211.96 KB 211.96 KB
47+
filter (100K → ~50K) 93.99 KB ±0.00% 93.99 KB 93.99 KB
48+
group_by + summarise (100K → 100 groups) 65.86 KB ±0.00% 65.86 KB 65.86 KB
49+
from_parquet (100K rows) 163.49 KB ±0.00% 163.49 KB 163.49 KB
50+
sort_by (100K rows) 162.97 KB ±0.00% 162.97 KB 162.97 KB
51+
from_query (100K rows) 163.67 KB ±0.00% 163.67 KB 163.67 KB
52+
from_list (100 rows) 454.92 KB ±0.00% 454.92 KB 454.92 KB
53+
from_query (1M rows) 1480.41 KB ±0.00% 1480.41 KB 1480.41 KB
54+
55+
Comparison:
56+
chained from computed (100K): filter → head 23.25 KB
57+
full pipeline (100K): filter → mutate → group → summarise â 30.11 KB - 1.30x memory usage +6.86 KB
58+
mutate add column (100K) 211.96 KB - 9.12x memory usage +188.71 KB
59+
filter (100K → ~50K) 93.99 KB - 4.04x memory usage +70.74 KB
60+
group_by + summarise (100K → 100 groups) 65.86 KB - 2.83x memory usage +42.61 KB
61+
from_parquet (100K rows) 163.49 KB - 7.03x memory usage +140.24 KB
62+
sort_by (100K rows) 162.97 KB - 7.01x memory usage +139.72 KB
63+
from_query (100K rows) 163.67 KB - 7.04x memory usage +140.42 KB
64+
from_list (100 rows) 454.92 KB - 19.57x memory usage +431.67 KB
65+
from_query (1M rows) 1480.41 KB - 63.67x memory usage +1457.16 KB
66+
67+
--- Distributed benchmark ---
68+
69+
Benchmarking distributed (2 workers): 100K filter + aggregate ...
70+
Benchmarking single-node baseline: 100K filter + aggregate ...
71+
Calculating statistics...
72+
Formatting results...
73+
74+
Name ips average deviation median 99th %
75+
single-node baseline: 100K filter + aggregate 24.50 40.81 ms ±143.44% 14.54 ms 265.23 ms
76+
distributed (2 workers): 100K filter + aggregate 18.47 54.15 ms ±129.69% 27.26 ms 357.62 ms
77+
78+
Comparison:
79+
single-node baseline: 100K filter + aggregate 24.50
80+
distributed (2 workers): 100K filter + aggregate 18.47 - 1.33x slower +13.34 ms
81+
82+
--- Streaming vs batch merge ---
83+
84+
Benchmarking streaming merge (MIN + MAX, 2 workers) ...
85+
Benchmarking streaming merge (SUM + COUNT, 2 workers) ...
86+
Calculating statistics...
87+
Formatting results...
88+
89+
Name ips average deviation median 99th %
90+
streaming merge (MIN + MAX, 2 workers) 23.97 41.73 ms ±166.25% 16.26 ms 372.94 ms
91+
streaming merge (SUM + COUNT, 2 workers) 7.36 135.90 ms ±138.86% 39.25 ms 816.42 ms
92+
93+
Comparison:
94+
streaming merge (MIN + MAX, 2 workers) 23.97
95+
streaming merge (SUM + COUNT, 2 workers) 7.36 - 3.26x slower +94.18 ms
96+
97+
--- Shuffle join benchmark ---
98+
99+
Benchmarking local join baseline (100K × 100K) ...
100+
Benchmarking shuffle join (2 workers, 100K × 100K) ...
101+
Calculating statistics...
102+
Formatting results...
103+
104+
Name ips average deviation median 99th %
105+
local join baseline (100K × 100K) 0.40 2.53 s ±2.79% 2.53 s 2.58 s
106+
shuffle join (2 workers, 100K × 100K) 0.37 2.73 s ±3.67% 2.73 s 2.80 s
107+
108+
Comparison:
109+
local join baseline (100K × 100K) 0.40
110+
shuffle join (2 workers, 100K × 100K) 0.37 - 1.08x slower +0.199 s
111+
112+
--- Broadcast join (bloom filter) benchmark ---
113+
114+
Benchmarking broadcast join + bloom filter (2 workers, 100K × 20) ...
115+
Benchmarking local join baseline (100K × 20) ...
116+
Calculating statistics...
117+
Formatting results...
118+
119+
Name ips average deviation median 99th %
120+
local join baseline (100K × 20) 16.94 59.05 ms ±117.62% 23.45 ms 253.97 ms
121+
broadcast join + bloom filter (2 workers, 100K × 20) 14.94 66.93 ms ±120.61% 32.28 ms 347.11 ms
122+
123+
Comparison:
124+
local join baseline (100K × 20) 16.94
125+
broadcast join + bloom filter (2 workers, 100K × 20) 14.94 - 1.13x slower +7.88 ms
126+
127+
Benchmarks complete.

lib/dux/flame.ex

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,11 +76,34 @@ if Code.ensure_loaded?(FLAME) do
7676
pid
7777
end
7878

79-
# Wait for :pg registration to propagate
80-
Process.sleep(100)
79+
await_pg_registration(workers)
8180
workers
8281
end
8382

83+
defp await_pg_registration(workers, timeout_ms \\ 5_000) do
84+
expected = MapSet.new(workers)
85+
deadline = System.monotonic_time(:millisecond) + timeout_ms
86+
do_await_pg(expected, deadline)
87+
end
88+
89+
defp do_await_pg(expected, deadline) do
90+
registered =
91+
:pg.get_members(:dux, Dux.Remote.Worker)
92+
|> MapSet.new()
93+
94+
if MapSet.subset?(expected, registered) do
95+
:ok
96+
else
97+
if System.monotonic_time(:millisecond) > deadline do
98+
# Best-effort: proceed even if not all registered yet
99+
:ok
100+
else
101+
Process.sleep(10)
102+
do_await_pg(expected, deadline)
103+
end
104+
end
105+
end
106+
84107
@doc """
85108
Get status of the FLAME-backed Dux cluster.
86109

lib/dux/remote/coordinator.ex

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,9 @@ defmodule Dux.Remote.Coordinator do
4343
workers = Keyword.get_lazy(opts, :workers, &Worker.list/0)
4444
timeout = Keyword.get(opts, :timeout, :infinity)
4545
strategy = Keyword.get(opts, :strategy, :round_robin)
46-
bcast_threshold = Keyword.get(opts, :broadcast_threshold, @broadcast_threshold)
46+
# Scale broadcast threshold by worker count so total network cost stays constant
47+
raw_threshold = Keyword.get(opts, :broadcast_threshold, @broadcast_threshold)
48+
bcast_threshold = div(raw_threshold, max(length(workers), 1))
4749

4850
if workers == [] do
4951
raise ArgumentError, "no workers available for distributed execution"
@@ -305,7 +307,7 @@ defmodule Dux.Remote.Coordinator do
305307
end,
306308
timeout: timeout,
307309
max_concurrency: n_workers,
308-
ordered: true
310+
ordered: false
309311
)
310312
|> Enum.map(fn
311313
{:ok, {:ok, ipc}} -> {:ok, ipc}

lib/dux/remote/merger.ex

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -175,18 +175,24 @@ defmodule Dux.Remote.Merger do
175175
# Determine the correct re-aggregation function based on the original expression.
176176
# SUM → SUM, COUNT → SUM, MIN → MIN, MAX → MAX
177177
# AVG columns should have been rewritten by PipelineSplitter before reaching here.
178+
# Uses word-boundary regex to prevent substring matches (e.g. COUNT_DISTINCT matching COUNT).
179+
# Order matters: more specific patterns (APPROX_COUNT_DISTINCT, COUNT_DISTINCT) before COUNT.
178180
defp re_aggregate_expr(name, expr) when is_binary(expr) do
179-
upper = String.upcase(expr)
180181
quoted = qi(name)
181182

182-
cond do
183-
String.contains?(upper, "MIN(") -> "MIN(#{quoted}) AS #{quoted}"
184-
String.contains?(upper, "MAX(") -> "MAX(#{quoted}) AS #{quoted}"
185-
String.contains?(upper, "SUM(") -> "SUM(#{quoted}) AS #{quoted}"
186-
String.contains?(upper, "COUNT(") -> "SUM(#{quoted}) AS #{quoted}"
187-
# Default: SUM (safe for additive aggregates)
188-
true -> "SUM(#{quoted}) AS #{quoted}"
189-
end
183+
agg_fn =
184+
cond do
185+
Regex.match?(~r/\bMIN\s*\(/i, expr) -> "MIN"
186+
Regex.match?(~r/\bMAX\s*\(/i, expr) -> "MAX"
187+
Regex.match?(~r/\bSUM\s*\(/i, expr) -> "SUM"
188+
Regex.match?(~r/\bAPPROX_COUNT_DISTINCT\s*\(/i, expr) -> "SUM"
189+
Regex.match?(~r/\bCOUNT_DISTINCT\s*\(/i, expr) -> "SUM"
190+
Regex.match?(~r/\bCOUNT\s*\(/i, expr) -> "SUM"
191+
# Default: SUM (safe for additive aggregates)
192+
true -> "SUM"
193+
end
194+
195+
"#{agg_fn}(#{quoted}) AS #{quoted}"
190196
end
191197

192198
defp re_aggregate_expr(name, _expr) do

0 commit comments

Comments
 (0)