Skip to content

Commit 89d5987

Browse files
cigraingerclaude
andcommitted
feat: distributed correctness — pipeline splitter, aggregate fixes
Major distributed correctness improvements: PipelineSplitter: classifies ops into worker-safe vs coordinator-only. - filter, mutate, select, discard, rename, drop_nil → push to workers - sort_by, head, distinct → push to workers AND re-apply on coordinator - slice, pivot_wider, pivot_longer → coordinator-only (positional/schema) - summarise with AVG → rewritten to SUM+COUNT on workers, divided on coordinator Merger aggregate fixes: - MIN() re-aggregated with MIN() (was SUM — WRONG) - MAX() re-aggregated with MAX() (was SUM — WRONG) - SUM/COUNT still use SUM (correct) - AVG handled via pipeline rewrite (SUM/COUNT split) Coordinator now: - Splits pipeline via PipelineSplitter before fan-out - Applies AVG rewrites after merge (SUM/COUNT → division) - Applies coordinator-only ops (slice, pivot) post-merge 14 correctness tests: MIN/MAX re-aggregation, grouped MIN/MAX, AVG rewrite (global + grouped), mixed aggregates (SUM+COUNT+AVG+MIN+MAX), slice on coordinator, pivot on coordinator, pipeline splitter unit tests. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 7b3de2b commit 89d5987

4 files changed

Lines changed: 534 additions & 8 deletions

File tree

lib/dux/remote/coordinator.ex

Lines changed: 65 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ defmodule Dux.Remote.Coordinator do
1919
The result is a `%Dux{}` struct with the merged data.
2020
"""
2121

22-
alias Dux.Remote.{Merger, Partitioner, Worker}
22+
alias Dux.Remote.{Merger, Partitioner, PipelineSplitter, Worker}
2323

2424
@doc """
2525
Execute a `%Dux{}` pipeline across distributed workers.
@@ -44,8 +44,14 @@ defmodule Dux.Remote.Coordinator do
4444
raise ArgumentError, "no workers available for distributed execution"
4545
end
4646

47-
# Partition the pipeline across workers
48-
assignments = Partitioner.assign(pipeline, workers, strategy: strategy)
47+
# Split pipeline: worker ops push down, coordinator ops apply post-merge
48+
%{worker_ops: worker_ops, coordinator_ops: coord_ops, agg_rewrites: rewrites} =
49+
PipelineSplitter.split(pipeline.ops)
50+
51+
worker_pipeline = %{pipeline | ops: worker_ops}
52+
53+
# Partition the worker pipeline across workers
54+
assignments = Partitioner.assign(worker_pipeline, workers, strategy: strategy)
4955

5056
# Fan out: each worker executes its partition
5157
results = fan_out(assignments, timeout)
@@ -59,7 +65,13 @@ defmodule Dux.Remote.Coordinator do
5965
end
6066

6167
# Merge partial results on coordinator
62-
Merger.merge_to_dux(successes, pipeline)
68+
merged = Merger.merge_to_dux(successes, worker_pipeline)
69+
70+
# Apply AVG rewrites if any
71+
merged = apply_avg_rewrites(merged, rewrites)
72+
73+
# Apply coordinator-only ops (slice, pivot, etc.)
74+
apply_coordinator_ops(merged, coord_ops)
6375
end
6476

6577
@doc """
@@ -106,4 +118,53 @@ defmodule Dux.Remote.Coordinator do
106118
{Enum.map(ok, fn {:ok, ipc} -> ipc end), err}
107119
end)
108120
end
121+
122+
# Apply AVG rewrites: compute __sum / __count for each rewritten AVG column
123+
defp apply_avg_rewrites(dux, rewrites) when map_size(rewrites) == 0, do: dux
124+
125+
defp apply_avg_rewrites(dux, rewrites) do
126+
# For each AVG rewrite, add a mutate computing sum/count,
127+
# then discard the intermediate columns
128+
avg_exprs =
129+
Enum.map(rewrites, fn {name, {:avg, sum_col, count_col, _inner}} ->
130+
{name, "\"#{esc(sum_col)}\" / \"#{esc(count_col)}\""}
131+
end)
132+
133+
intermediate_cols =
134+
Enum.flat_map(rewrites, fn {_name, {:avg, sum_col, count_col, _}} ->
135+
[String.to_atom(sum_col), String.to_atom(count_col)]
136+
end)
137+
138+
dux
139+
|> Dux.mutate_with(avg_exprs)
140+
|> Dux.discard(intermediate_cols)
141+
end
142+
143+
# Apply coordinator-only ops to the merged result
144+
defp apply_coordinator_ops(dux, []), do: dux
145+
146+
defp apply_coordinator_ops(dux, [op | rest]) do
147+
updated = apply_single_op(dux, op)
148+
apply_coordinator_ops(updated, rest)
149+
end
150+
151+
defp apply_single_op(dux, {:slice, offset, length}), do: Dux.slice(dux, offset, length)
152+
defp apply_single_op(dux, {:head, n}), do: Dux.head(dux, n)
153+
defp apply_single_op(dux, {:sort_by, spec}), do: %{dux | ops: dux.ops ++ [{:sort_by, spec}]}
154+
defp apply_single_op(dux, {:distinct, cols}), do: %{dux | ops: dux.ops ++ [{:distinct, cols}]}
155+
156+
defp apply_single_op(dux, {:pivot_wider, names_col, values_col, agg}) do
157+
Dux.pivot_wider(dux, names_col, values_col, agg: agg)
158+
end
159+
160+
defp apply_single_op(dux, {:pivot_longer, cols, names_to, values_to}) do
161+
Dux.pivot_longer(dux, cols, names_to: names_to, values_to: values_to)
162+
end
163+
164+
defp apply_single_op(dux, op) do
165+
# Unknown op — append directly
166+
%{dux | ops: dux.ops ++ [op]}
167+
end
168+
169+
defp esc(name), do: String.replace(name, ~s("), ~s(""))
109170
end

lib/dux/remote/merger.ex

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -121,13 +121,12 @@ defmodule Dux.Remote.Merger do
121121
"SELECT DISTINCT * FROM (#{union_sql}) __merged"
122122

123123
{:re_aggregate, groups, aggs} ->
124-
# Re-aggregate partial results
124+
# Re-aggregate partial results with correct functions
125125
group_cols = Enum.map_join(groups, ", ", &qi/1)
126126

127127
agg_cols =
128-
Enum.map_join(aggs, ", ", fn {name, _expr} ->
129-
quoted = qi(name)
130-
"SUM(#{quoted}) AS #{quoted}"
128+
Enum.map_join(aggs, ", ", fn {name, expr} ->
129+
re_aggregate_expr(name, expr)
131130
end)
132131

133132
select = if groups == [], do: agg_cols, else: "#{group_cols}, #{agg_cols}"
@@ -179,6 +178,28 @@ defmodule Dux.Remote.Merger do
179178
end)
180179
end
181180

181+
# Determine the correct re-aggregation function based on the original expression.
182+
# SUM → SUM, COUNT → SUM, MIN → MIN, MAX → MAX
183+
# AVG columns should have been rewritten by PipelineSplitter before reaching here.
184+
defp re_aggregate_expr(name, expr) when is_binary(expr) do
185+
upper = String.upcase(expr)
186+
quoted = qi(name)
187+
188+
cond do
189+
String.contains?(upper, "MIN(") -> "MIN(#{quoted}) AS #{quoted}"
190+
String.contains?(upper, "MAX(") -> "MAX(#{quoted}) AS #{quoted}"
191+
String.contains?(upper, "SUM(") -> "SUM(#{quoted}) AS #{quoted}"
192+
String.contains?(upper, "COUNT(") -> "SUM(#{quoted}) AS #{quoted}"
193+
# Default: SUM (safe for additive aggregates)
194+
true -> "SUM(#{quoted}) AS #{quoted}"
195+
end
196+
end
197+
198+
defp re_aggregate_expr(name, _expr) do
199+
quoted = qi(name)
200+
"SUM(#{quoted}) AS #{quoted}"
201+
end
202+
182203
# Quote identifier — escape double quotes to prevent SQL injection
183204
defp qi(name) do
184205
escaped = String.replace(name, ~s("), ~s(""))
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
defmodule Dux.Remote.PipelineSplitter do
2+
@moduledoc false
3+
4+
# Splits a pipeline's ops into worker-safe ops (pushed to workers)
5+
# and coordinator ops (applied after merge).
6+
#
7+
# Safe to push: filter, mutate, select, discard, rename, drop_nil,
8+
# group_by, summarise (with rewrite), sort_by, head, distinct
9+
#
10+
# Coordinator-only: slice, pivot_wider, pivot_longer (after merge),
11+
# join (requires special handling)
12+
#
13+
# The splitter also rewrites AVG aggregations: workers compute
14+
# SUM + COUNT, coordinator divides to get the correct average.
15+
16+
@doc """
17+
Split pipeline ops into worker and coordinator portions.
18+
19+
Returns `%{worker_ops: [...], coordinator_ops: [...], agg_rewrites: %{}}`.
20+
21+
`agg_rewrites` maps output column names to their rewrite info
22+
(e.g., AVG rewritten to SUM/COUNT pair).
23+
"""
24+
def split(ops) do
25+
{worker_ops, coordinator_ops, rewrites} = do_split(ops, [], [], %{})
26+
27+
%{
28+
worker_ops: Enum.reverse(worker_ops),
29+
coordinator_ops: Enum.reverse(coordinator_ops),
30+
agg_rewrites: rewrites
31+
}
32+
end
33+
34+
# Walk ops left-to-right. Push safe ops to workers, keep others for coordinator.
35+
36+
# Filter, mutate, select, discard, rename, drop_nil — always safe
37+
defp do_split([{:filter, _} = op | rest], worker, coord, rewrites) do
38+
do_split(rest, [op | worker], coord, rewrites)
39+
end
40+
41+
defp do_split([{:mutate, _} = op | rest], worker, coord, rewrites) do
42+
do_split(rest, [op | worker], coord, rewrites)
43+
end
44+
45+
defp do_split([{:select, _} = op | rest], worker, coord, rewrites) do
46+
do_split(rest, [op | worker], coord, rewrites)
47+
end
48+
49+
defp do_split([{:discard, _} = op | rest], worker, coord, rewrites) do
50+
do_split(rest, [op | worker], coord, rewrites)
51+
end
52+
53+
defp do_split([{:rename, _} = op | rest], worker, coord, rewrites) do
54+
do_split(rest, [op | worker], coord, rewrites)
55+
end
56+
57+
defp do_split([{:drop_nil, _} = op | rest], worker, coord, rewrites) do
58+
do_split(rest, [op | worker], coord, rewrites)
59+
end
60+
61+
# Group by — push to workers (sets state for summarise)
62+
defp do_split([{:group_by, _} = op | rest], worker, coord, rewrites) do
63+
do_split(rest, [op | worker], coord, rewrites)
64+
end
65+
66+
defp do_split([{:ungroup} = op | rest], worker, coord, rewrites) do
67+
do_split(rest, [op | worker], coord, rewrites)
68+
end
69+
70+
# Summarise — push to workers, but rewrite AVG and track for re-aggregation
71+
defp do_split([{:summarise, aggs} | rest], worker, coord, rewrites) do
72+
{worker_aggs, new_rewrites} = rewrite_aggregates(aggs)
73+
do_split(rest, [{:summarise, worker_aggs} | worker], coord, Map.merge(rewrites, new_rewrites))
74+
end
75+
76+
# Sort, head, distinct — push to workers AND add to coordinator for re-merge
77+
defp do_split([{:sort_by, _} = op | rest], worker, coord, rewrites) do
78+
do_split(rest, [op | worker], [op | coord], rewrites)
79+
end
80+
81+
defp do_split([{:head, _} = op | rest], worker, coord, rewrites) do
82+
do_split(rest, [op | worker], [op | coord], rewrites)
83+
end
84+
85+
defp do_split([{:distinct, _} = op | rest], worker, coord, rewrites) do
86+
do_split(rest, [op | worker], [op | coord], rewrites)
87+
end
88+
89+
# Slice — coordinator only (OFFSET is positional, not safe per-partition)
90+
defp do_split([{:slice, _, _} = op | rest], worker, coord, rewrites) do
91+
do_split(rest, worker, [op | coord], rewrites)
92+
end
93+
94+
# Pivot — coordinator only (schema may differ across partitions)
95+
defp do_split([{:pivot_wider, _, _, _} = op | rest], worker, coord, rewrites) do
96+
do_split(rest, worker, [op | coord], rewrites)
97+
end
98+
99+
defp do_split([{:pivot_longer, _, _, _} = op | rest], worker, coord, rewrites) do
100+
do_split(rest, worker, [op | coord], rewrites)
101+
end
102+
103+
# Join — push to workers (right side will be broadcast/replicated by the partitioner)
104+
defp do_split([{:join, _, _, _, _} = op | rest], worker, coord, rewrites) do
105+
do_split(rest, [op | worker], coord, rewrites)
106+
end
107+
108+
# Concat — push to workers
109+
defp do_split([{:concat_rows, _} = op | rest], worker, coord, rewrites) do
110+
do_split(rest, [op | worker], coord, rewrites)
111+
end
112+
113+
# Unknown op — keep on coordinator for safety
114+
defp do_split([op | rest], worker, coord, rewrites) do
115+
do_split(rest, worker, [op | coord], rewrites)
116+
end
117+
118+
defp do_split([], worker, coord, rewrites) do
119+
{worker, coord, rewrites}
120+
end
121+
122+
# ---------------------------------------------------------------------------
123+
# Aggregate rewrites
124+
# ---------------------------------------------------------------------------
125+
126+
# Rewrite aggregates that can't be naively re-aggregated with SUM.
127+
# AVG(x) → workers compute __sum_name = SUM(x), __count_name = COUNT(x)
128+
# coordinator computes name = __sum_name / __count_name
129+
defp rewrite_aggregates(aggs) do
130+
{worker_aggs, rewrites} =
131+
Enum.reduce(aggs, {[], %{}}, fn {name, expr}, {acc_aggs, acc_rewrites} ->
132+
if avg_expr?(expr) do
133+
inner = extract_inner_expr(expr, "AVG")
134+
sum_name = "__avg_sum_#{name}"
135+
count_name = "__avg_count_#{name}"
136+
137+
new_aggs = [
138+
{sum_name, rewrite_func(expr, "AVG", "SUM")},
139+
{count_name, rewrite_func(expr, "AVG", "COUNT")} | acc_aggs
140+
]
141+
142+
new_rewrites = Map.put(acc_rewrites, name, {:avg, sum_name, count_name, inner})
143+
{new_aggs, new_rewrites}
144+
else
145+
{[{name, expr} | acc_aggs], acc_rewrites}
146+
end
147+
end)
148+
149+
{Enum.reverse(worker_aggs), rewrites}
150+
end
151+
152+
defp avg_expr?(expr) when is_binary(expr), do: String.contains?(String.upcase(expr), "AVG(")
153+
defp avg_expr?(_), do: false
154+
155+
defp extract_inner_expr(expr, func) do
156+
# Extract the inner expression from FUNC(inner)
157+
regex = ~r/#{func}\((.+)\)/i
158+
159+
case Regex.run(regex, expr) do
160+
[_, inner] -> inner
161+
_ -> expr
162+
end
163+
end
164+
165+
defp rewrite_func(expr, from, to) do
166+
String.replace(expr, ~r/#{from}\(/i, "#{to}(")
167+
end
168+
end

0 commit comments

Comments
 (0)